http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java new file mode 100644 index 0000000..027a3b1 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -0,0 +1,804 @@ +package mvm.rya.indexing.accumulo.temporal; + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.xml.datatype.XMLGregorianCalendar; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.KeyParts; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.StatementSerializer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.joda.time.DateTime; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import cern.colt.Arrays; + +public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { + + private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class); + + private static final String CF_INTERVAL = "interval"; + + + + // Delimiter used in the interval stored in the triple's object literal. + // So far, no ontology specifies a date range, just instants. + // Set to the same delimiter used by the indexer, probably needs revisiting. + //private static final String REGEX_intervalDelimiter = TemporalInterval.DELIMITER; + + private Configuration conf; + + private MultiTableBatchWriter mtbw; + + private BatchWriter temporalIndexBatchWriter; + + private Set<URI> validPredicates; + private String temporalIndexTableName; + + private boolean isInit = false; + + + + private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + temporalIndexTableName = ConfigUtils.getTemporalTableName(conf); + // Create one index table on first run. + ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); + + mtbw = ConfigUtils.createMultitableBatchWriter(conf); + + temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); + + validPredicates = ConfigUtils.getTemporalPredicates(conf); + } + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + init(); + isInit = true; + } catch (AccumuloException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (AccumuloSecurityException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (TableNotFoundException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (TableExistsException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return this.conf; + } + + + /** + * Store a statement in the index if it meets the criterion: Object should be + * a literal and one of the validPredicates from the configuration. + * If it does not meet the criteria, it is silently ignored. + * logs a warning if the object is not parse-able. + * Attempts to parse with calendarValue = literalValue.calendarValue() + * if that fails, tries: org.joda.time.DateTime.parse() . + * T O D O parse an interval using multiple predicates for same subject -- ontology dependent. + */ + private void storeStatement(Statement statement) throws IOException, IllegalArgumentException { + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + if (!isValidPredicate || !(statement.getObject() instanceof Literal)) + return; + DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval + extractDateTime(statement, indexDateTimes); + if (indexDateTimes[0]==null) + return; + + // Add this as an instant, or interval. + try { + if (indexDateTimes[1] != null) { + TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); + addInterval(temporalIndexBatchWriter, interval, statement); + } else { + TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); + addInstant(temporalIndexBatchWriter, instant, statement); + } + } catch (MutationsRejectedException e) { + throw new IOException("While adding interval/instant for statement =" + statement, e); + } + } + + + @Override + public void storeStatement(RyaStatement statement) throws IllegalArgumentException, IOException { + storeStatement(RyaToRdfConversions.convertStatement(statement)); + } + + + + /** + * parse the literal dates from the object of a statement. + * + * @param statement + * @param outputDateTimes + */ + private void extractDateTime(Statement statement, DateTime[] outputDateTimes) { + if (!(statement.getObject() instanceof Literal)) // Error since it should already be tested by caller. + throw new RuntimeException("Statement's object must be a literal: " + statement); + // throws IllegalArgumentException NumberFormatException if can't parse + String logThis = null; Literal literalValue = (Literal) statement.getObject(); + // First attempt to parse a interval in the form "[date1,date2]" + Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue()); + if (matcher.find()) { + try { + // Got a datetime pair, parse into an interval. + outputDateTimes[0] = new DateTime(matcher.group(1)); + outputDateTimes[1] = new DateTime(matcher.group(2)); + return; + } catch (java.lang.IllegalArgumentException e) { + logThis = e.getMessage() + " " + logThis; + outputDateTimes[0]=null; + outputDateTimes[1]=null; + } + } + + try { + XMLGregorianCalendar calendarValue = literalValue.calendarValue(); + outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar()); + outputDateTimes[1] = null; + return; + } catch (java.lang.IllegalArgumentException e) { + logThis = e.getMessage(); + } + // Try again using Joda Time DateTime.parse() + try { + outputDateTimes[0] = DateTime.parse(literalValue.stringValue()); + outputDateTimes[1] = null; + //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue()); + return; + } catch (java.lang.IllegalArgumentException e) { + logThis = e.getMessage() + " " + logThis; + } + logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis); + return; + } + + /** + * Index a new interval + * TODO: integrate into KeyParts (or eliminate) + * @param writer + * @param cv + * @param interval + * @throws MutationsRejectedException + */ + public void addInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException { + + Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); + Text cf = new Text(StatementSerializer.writeContext(statement)); + Text cqBegin = new Text(KeyParts.CQ_BEGIN); + Text cqEnd = new Text(KeyParts.CQ_END); + + // Start Begin index + Text keyText =new Text(interval.getAsKeyBeginning()); + KeyParts.appendUniqueness(statement, keyText); + Mutation m = new Mutation(keyText); + m.put(cf, cqBegin, statementValue); + // System.out.println("mutations add begin row=" + m.getRow() + " value=" + value.toString()); + writer.addMutation(m); + + // now the end index: + keyText = new Text(interval.getAsKeyEnd()); + KeyParts.appendUniqueness(statement, keyText); + m = new Mutation(keyText); + m.put(cf, cqEnd, new Value(statementValue)); + // System.out.println("mutations add end row=" + m.getRow() + " value=" + value.toString()); + writer.addMutation(m); + } + + + /** + * Index a new interval + * Make indexes that handle this expression: + * hash( s? p? ) ?o + * == o union hash(s)o union hash(p)o union hash(sp)o + * + * @param writer + * @param cv + * @param instant + * @throws MutationsRejectedException + */ + public void addInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { + KeyParts keyParts = new KeyParts(statement, instant); + for (KeyParts k: keyParts) { + Mutation m = new Mutation(k.getStoreKey()); + m.put(k.cf, k.cq,k.getValue()); + writer.addMutation(m); + } + } + + + /** + * creates a scanner and handles all the throwables and nulls. + * + * @param scanner + * @return + * @throws IOException + */ + private Scanner getScanner() throws QueryEvaluationException { + String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName; + Scanner scanner = null; + try { + scanner = ConfigUtils.createScanner(temporalIndexTableName, conf); + } catch (AccumuloException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing, e); + } catch (AccumuloSecurityException e) { + throw new QueryEvaluationException(whileDoing, e); + } catch (TableNotFoundException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing + + " The temporal index table should have been created by this constructor, if found missing.", e); + } + return scanner; + } + + private BatchScanner getBatchScanner() throws QueryEvaluationException { + String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName; + try { + return ConfigUtils.createBatchScanner(temporalIndexTableName, conf); + } catch (AccumuloException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing, e); + } catch (AccumuloSecurityException e) { + throw new QueryEvaluationException(whileDoing, e); + } catch (TableNotFoundException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing + + " The temporal index table should have been created by this constructor, if found missing. ", e); + } + } + + + /** + * statements where the datetime is exactly the same as the queryInstant. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant( + TemporalInstant queryInstant, StatementContraints constraints) + throws QueryEvaluationException { + // get rows where the repository time is equal to the given time in queryInstant. + Query query = new Query() { + @Override + public Range getRange(KeyParts keyParts) { + //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey())); + return Range.prefix(keyParts.getQueryKey()); // <-- specific logic + } + }; + ScannerBase scanner = query.doQuery(queryInstant, constraints); + // TODO currently context constraints are filtered on the client. + return getContextIteratorWrapper(scanner, constraints.getContext()); + } + + /** + * get statements where the db row ID is BEFORE the given queryInstant. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant( + TemporalInstant queryInstant, StatementContraints constraints) + throws QueryEvaluationException { + // get rows where the repository time is before the given time. + Query query = new Query() { + @Override + public Range getRange(KeyParts keyParts) { + Text start= null; + if (keyParts.constraintPrefix != null ) // Yes, has constraints + start = keyParts.constraintPrefix; // <-- start specific logic + else + start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); + Text endAt = keyParts.getQueryKey(); // <-- end specific logic + //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, true, endAt, false); + } + }; + ScannerBase scanner = query.doQuery(queryInstant, constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); + } + + /** + * get statements where the date object is after the given queryInstant. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant( + TemporalInstant queryInstant, StatementContraints constraints) + throws QueryEvaluationException { + Query query = new Query() { + @Override + public Range getRange(KeyParts keyParts) { + Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic + Text endAt = null; // no constraints // <-- specific logic + if (keyParts.constraintPrefix != null ) // Yes, has constraints + endAt = Range.followingPrefix(keyParts.constraintPrefix); + //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, true, endAt, false); + } + }; + ScannerBase scanner = query.doQuery(queryInstant, constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); + } + + /** + * Get instances before a given interval. Returns queryInstantBeforeInstant with the interval's beginning time. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval( + TemporalInterval givenInterval, StatementContraints contraints) + throws QueryEvaluationException { + return queryInstantBeforeInstant(givenInterval.getHasBeginning(), contraints); + } + + /** + * Get instances after a given interval. Returns queryInstantAfterInstant with the interval's end time. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval( + TemporalInterval givenInterval, StatementContraints contraints) throws QueryEvaluationException { + return queryInstantAfterInstant(givenInterval.getHasEnd(), contraints); + } + + /** + * Get instances inside a given interval. + * Returns after interval's beginning time, and before ending time, + * exclusive (don't match the beginning and ending). + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( + TemporalInterval queryInterval, StatementContraints constraints) + throws QueryEvaluationException { + // get rows where the time is after the given interval's beginning time and before the ending time. + final TemporalInterval theQueryInterval = queryInterval; + Query query = new Query() { + private final TemporalInterval queryInterval = theQueryInterval; + @Override + public Range getRange(KeyParts keyParts) { + Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning()))); + Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic + //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, false, endAt, false); + } + }; + ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); + } + /** + * Get instances matching the beginning of a given interval. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval( + TemporalInterval queryInterval, StatementContraints contraints) + throws QueryEvaluationException { + return queryInstantEqualsInstant(queryInterval.getHasBeginning(), contraints); + } + + /** + * Get instances matching the ending of a given interval. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval( + TemporalInterval queryInterval, StatementContraints contraints) + throws QueryEvaluationException { + return queryInstantEqualsInstant(queryInterval.getHasEnd(), contraints); + } + + /** + * Get intervals stored in the repository matching the given interval. + * Indexing Intervals will probably change or be removed. + * Currently predicate and subject constraints are filtered on the client. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals( + TemporalInterval query, StatementContraints contraints) + throws QueryEvaluationException { + Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the start and end match. + Range range = Range.prefix(new Text(query.getAsKeyBeginning())); + scanner.setRange(range); + if (contraints.hasContext()) + scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); + else + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); + } + // Iterator<Entry<Key, Value>> iter = scanner.iterator(); + // while (iter.hasNext()) { + // System.out.println("queryIntervalEquals results:"+iter.next()); + // } + //return getConstrainedIteratorWrapper(scanner, contraints); + return getIteratorWrapper(scanner); + } + + /** + * find intervals stored in the repository before the given Interval. Find interval endings that are + * before the given beginning. + * Indexing Intervals will probably change or be removed. + * Currently predicate and subject constraints are filtered on the client. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore( + TemporalInterval queryInterval, StatementContraints constraints) throws QueryEvaluationException + { + Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the end date is less than the queryInterval.getBefore() + Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false); + scanner.setRange(range); + if (constraints.hasContext()) + scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END)); + else + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END)); + } + return getIteratorWrapper(scanner); + } + + /** + * Interval after given interval. Find intervals that begin after the endings of the given interval. + * Use the special following prefix mechanism to avoid matching the beginning date. + * Indexing Intervals will probably change or be removed. + * Currently predicate and subject and context constraints are filtered on the client. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( + TemporalInterval queryInterval, StatementContraints constraints) + throws QueryEvaluationException { + + Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the start date is greater than the queryInterval.getEnd() + Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true); + scanner.setRange(range); + + if (constraints.hasContext()) + scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); + else + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); + } + // TODO currently predicate, subject and context constraints are filtered on the clients + return getIteratorWrapper(scanner); + } + // -- + // -- END of Query functions. Next up, general stuff used by the queries above. + // -- + + /** + * Allows passing range specific logic into doQuery. + * Each query function implements an anonymous instance of this and calls it's doQuery(). + */ + abstract class Query { + abstract protected Range getRange(KeyParts keyParts); + + public ScannerBase doQuery(TemporalInstant queryInstant, StatementContraints constraints) throws QueryEvaluationException { + // key is contraintPrefix + time, or just time. + // Any constraints handled here, if the constraints are empty, the + // thisKeyParts.contraintPrefix will be null. + List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints); + ScannerBase scanner = null; + if (keyParts.size() > 1) + scanner = getBatchScanner(); + else + scanner = getScanner(); + + Collection<Range> ranges = new HashSet<Range>(); + KeyParts lastKeyParts = null; + Range range = null; + for (KeyParts thisKeyParts : keyParts) { + range = this.getRange(thisKeyParts); + ranges.add(range); + lastKeyParts = thisKeyParts; + } + //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq); + scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq)); + if (scanner instanceof BatchScanner) + ((BatchScanner) scanner).setRanges(ranges); + else if (range != null) + ((Scanner) scanner).setRange(range); + return scanner; + } + } + + /** + * An iteration wrapper for a loaded scanner that is returned for each query above. + * + * @param scanner + * the results to iterate, then close. + * @return an anonymous object that will iterate the resulting statements from a given scanner. + */ + private static CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final ScannerBase scanner) { + + final Iterator<Entry<Key, Value>> i = scanner.iterator(); + + return new CloseableIteration<Statement, QueryEvaluationException>() { + @Override + public boolean hasNext() { + return i.hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + Entry<Key, Value> entry = i.next(); + Value v = entry.getValue(); + try { + String dataString = Text.decode(v.get(), 0, v.getSize()); + Statement s = StatementSerializer.readStatement(dataString); + return s; + } catch (CharacterCodingException e) { + logger.error("Error decoding value=" + Arrays.toString(v.get()), e); + throw new QueryEvaluationException(e); + } catch (IOException e) { + logger.error("Error de-serializing statement, string=" + v.get(), e); + throw new QueryEvaluationException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + }; + } + + + /** + * An iteration wrapper for a loaded scanner that is returned for partially supported interval queries above. + * + * @param scanner the results to iterate, then close. + * @param constraints limit statements returned by next() to those matching the constraints. + * @return an anonymous object that will iterate the resulting statements from a given scanner. + * @throws QueryEvaluationException + */ + private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementContraints constraints) { + if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) + return getIteratorWrapper(scanner); + return new ConstrainedIteratorWrapper(scanner) { + @Override + public boolean allowedBy(Statement statement) { + return allowedByConstraints(statement, constraints); + } + }; + } + /** + * An iteration wrapper for a loaded scanner that is returned for queries above. + * Currently, this temporal index supports contexts only on the client, using this filter. + * + * @param scanner the results to iterate, then close. + * @param constraints limit statements returned by next() to those matching the constraints. + * @return an anonymous object that will iterate the resulting statements from a given scanner. + * @throws QueryEvaluationException + */ + private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) { + if (context==null) + return getIteratorWrapper(scanner); + return new ConstrainedIteratorWrapper(scanner) { + @Override + public boolean allowedBy(Statement statement) { + return allowedByContext(statement, context); + } + }; + } + /** + * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy(). + * If the allowedBy function returns false for the next statement, it is skipped. + * This is used for to do client side, what the index cannot (yet) do on the server side. + */ + abstract static class ConstrainedIteratorWrapper implements CloseableIteration<Statement, QueryEvaluationException> { + private Statement nextStatement=null; + private boolean isInitialized = false; + final private Iterator<Entry<Key, Value>> i; + final private ScannerBase scanner; + + ConstrainedIteratorWrapper(ScannerBase scanner) { + this.scanner = scanner; + i=scanner.iterator(); + } + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!isInitialized) + internalGetNext(); + return (nextStatement != null) ; + } + + @Override + public Statement next() throws QueryEvaluationException { + if (nextStatement==null) { + if (!isInitialized) + internalGetNext(); + if (nextStatement==null) + throw new NoSuchElementException(); + } + // use this one, then get the next one loaded. + Statement thisStatement = this.nextStatement; + internalGetNext(); + return thisStatement; + } + + /** + * Gets the next statement meeting constraints and stores in nextStatement. + * Sets null when all done, or on exception. + * @throws QueryEvaluationException + */ + private void internalGetNext() + throws QueryEvaluationException { + isInitialized=true; + this.nextStatement = null; // Default on done or error. + Statement statement = null; + while (i.hasNext()) { + Entry<Key, Value> entry = i.next(); + Value v = entry.getValue(); + try { + String dataString = Text.decode(v.get(), 0, v.getSize()); + statement = StatementSerializer.readStatement(dataString); + } catch (CharacterCodingException e) { + logger.error("Error decoding value=" + Arrays.toString(v.get()), e); + throw new QueryEvaluationException(e); + } catch (IOException e) { + logger.error("Error de-serializing statement, string=" + v.get(), e); + throw new QueryEvaluationException(e); + } + if (allowedBy(statement)) { + this.nextStatement = statement; + return; + } + } + } + public abstract boolean allowedBy(Statement s); + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + } + + /** + * Does the statement meet the constraints? Match predicate, subject, and context. + * @param statement Candidate statement to be allowed or not. + * @param contraints fields that are non-null must match the statement's components, otherwise it is not allowed. + * @return true if the parts of the statement match the statementConstraints' parts. + */ + protected static boolean allowedByConstraints(Statement statement, StatementContraints constraints) { + + if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString())) + {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;} + //return false; + + if (! allowedByContext(statement, constraints.getContext())) + return false; + //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;} + + if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate())) + return false; + //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;} + + System.out.println("allow statement: "+ statement.toString()); + return true; + } + + /** + * Allow only if the context matches the statement. This is a client side filter. + * @param statement + * @param context + * @return + */ + protected static boolean allowedByContext(Statement statement, Resource context) { + return context==null || context.equals( statement.getContext() ); + } + + @Override + public Set<URI> getIndexablePredicates() { + + return validPredicates; + } + + /** + * Flush the data to the batchwriter. + * Throws a IOException as required by the flushable interface, + * wrapping MutationsRejectedException. + */ + @Override + public void flush() throws IOException { + try { + mtbw.flush(); + } catch (MutationsRejectedException e) { + String msg = "Error while flushing the batch writer."; + logger.error(msg, e); + throw new IOException(msg, e); + } + } + + /** + * Close batchwriter. + * Throws a IOException as required by the flushable interface, + * wrapping MutationsRejectedException. + */ + @Override + public void close() throws IOException { + try { + + mtbw.close(); + + } catch (MutationsRejectedException e) { + String msg = "Error while closing the batch writer."; + logger.error(msg, e); + throw new IOException(msg, e); + } + } + + + + @Override + public String getTableName() { + return ConfigUtils.getTemporalTableName(conf); + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java new file mode 100644 index 0000000..917095b --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java @@ -0,0 +1,198 @@ +/** + * + */ +package mvm.rya.indexing.accumulo.temporal; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; + +import org.apache.commons.codec.binary.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +/** + * Immutable date and time instance returning a human readable key. + * Preserves the Time zone, but not stored in the key. + * Converts fields (hours, etc) correctly for tz=Zulu when stored, + * so the original timezone is not preserved when retrieved. + * + * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset + * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt + * + * Limits: All dates and times are assumed to be in the "current era", no BC, + * somewhere between 0000AD and 9999AD. + * + * Resolution: to the second, or millisecond if the optional fraction is used. + * + * This is really a wrapper for Joda DateTime. if you need functionality from + * that wonderful class, simply use t.getAsDateTime(). + * + */ +public class TemporalInstantRfc3339 implements TemporalInstant { + + private static final long serialVersionUID = -7790000399142290309L; + + private final DateTime dateTime; + /** + * Format key like this: YYYY-MM-DDThh:mm:ssZ + */ + public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); + + /** + * New date assumed UTC time zone. + * + * @param year + * @param month + * @param day + * @param hour + * @param minute + * @param second + */ + public TemporalInstantRfc3339(int year, int month, int day, int hour, int minute, int second) { + dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC); + } + + /** + * Construct with a Joda/java v8 DateTime; + * TZ is preserved, but not in the key. + * + * @param dateTime + * initialize with this date time. Converted to zulu time zone for key generation. + * @return + */ + public TemporalInstantRfc3339(DateTime datetime) { + this.dateTime = datetime; + } + /** + * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}. + * beginning must be less than end. + * + * @param dateTimeInterval String in the form [dateTime1,dateTime2] + */ + public static TemporalInterval parseInterval(String dateTimeInterval) { + + Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(dateTimeInterval); + if (matcher.find()) { + // Got a date time pair, parse into an interval. + return new TemporalInterval( + new TemporalInstantRfc3339(new DateTime(matcher.group(1))), + new TemporalInstantRfc3339(new DateTime(matcher.group(2)))); + } + throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval); + } + + /** + * if this is older returns -1, equal 0, else 1 + * + */ + @Override + public int compareTo(TemporalInstant that) { + return this.getAsKeyString().compareTo(that.getAsKeyString()); + } + + @Override + public byte[] getAsKeyBytes() { + return StringUtils.getBytesUtf8(getAsKeyString()); + } + + @Override + public String getAsKeyString() { + return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER); + } + + /** + * Readable string, formated local time at {@link DateTimeZone}. + * If the timezone is UTC (Z), it was probably a key from the database. + * If the server and client are in different Time zone, should probably use the client timezone. + * + * Time at specified time zone: + * instant.getAsReadable(DateTimeZone.forID("-05:00"))); + * instant.getAsReadable(DateTimeZone.getDefault())); + * + * Use original time zone set in the constructor: + * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER)); + * + */ + @Override + public String getAsReadable(DateTimeZone dateTimeZone) { + return dateTime.withZone(dateTimeZone).toString(FORMATTER); + } + + /** + * Use original time zone set in the constructor, or UTC if from parsing the key. + */ + @Override + public String getAsReadable() { + return dateTime.toString(FORMATTER); + } + + /** + * default toString, same as getAsReadable(). + */ + @Override + public String toString() { + return getAsReadable(); + } + + /** + * Show readable time converted to the default timezone. + */ + @Override + public DateTime getAsDateTime() { + return dateTime; + } + + /** + * Minimum Date, used for infinitely past. + */ + private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE)); + /** + * maximum date/time is used for infinitely in the future. + */ + private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE)); + + /** + * infinite past date. + * @return an instant that will compare as NEWER than anything but itself. + */ + public static TemporalInstant getMinimumInstance() { + return MINIMUM; + } + /** + * infinite future date. + * @return an instant that will compare as OLDER than anything but itself + */ + + public static TemporalInstant getMaximumInstance() { + return MAXIMUM; + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return this.getAsKeyString().hashCode(); + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj; + return (this.getAsKeyString().equals(other.getAsKeyString())); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java new file mode 100644 index 0000000..66bedce --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java @@ -0,0 +1,300 @@ +package mvm.rya.indexing.accumulo.temporal; + +import info.aduna.iteration.CloseableIteration; + +import java.util.Map; +import java.util.Set; + +import mvm.rya.indexing.IndexingExpr; +import mvm.rya.indexing.IteratorFactory; +import mvm.rya.indexing.SearchFunction; +import mvm.rya.indexing.SearchFunctionFactory; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; +import mvm.rya.indexing.accumulo.geo.GeoTupleSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.hadoop.conf.Configuration; +import org.joda.time.DateTime; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryModelVisitor; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; + +//Indexing Node for temporal expressions to be inserted into execution plan +//to delegate temporal portion of query to temporal index +public class TemporalTupleSet extends ExternalTupleSet { + + private Configuration conf; + private TemporalIndexer temporalIndexer; + private IndexingExpr filterInfo; + + + public TemporalTupleSet(IndexingExpr filterInfo, TemporalIndexer temporalIndexer) { + this.filterInfo = filterInfo; + this.temporalIndexer = temporalIndexer; + this.conf = temporalIndexer.getConf(); + } + + /** + * {@inheritDoc} + */ + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + /** + * {@inheritDoc} + * <p> + * Note that we need a deep copy for everything that (during optimizations) + * can be altered via {@link #visitChildren(QueryModelVisitor)} + */ + public TemporalTupleSet clone() { + return new TemporalTupleSet(filterInfo, temporalIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + @Override + public String getSignature() { + + return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof TemporalTupleSet)) { + return false; + } + TemporalTupleSet arg = (TemporalTupleSet) other; + return this.filterInfo.equals(arg.filterInfo); + } + + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + /** + * Returns an iterator over the result set associated with contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + + + URI funcURI = filterInfo.getFunction(); + SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); + + if(filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + String queryText = filterInfo.getArguments()[0].stringValue(); + + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + + + //returns appropriate search function for a given URI + //search functions used by TemporalIndexer to query Temporal Index + private class TemporalSearchFunctionFactory { + + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + Configuration conf; + + public TemporalSearchFunctionFactory(Configuration conf) { + this.conf = conf; + } + + + /** + * Get a {@link TemporalSearchFunction} for a give URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + + SearchFunction geoFunc = null; + + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + + + } + + + + private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantAfterInstant"; + }; + }; + private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantBeforeInstant"; + }; + }; + + private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantEqualsInstant"; + }; + }; + + private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantAfterInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantBeforeInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantInsideInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantHasBeginningInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, + StatementContraints contraints) throws QueryEvaluationException { + TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantHasEndInterval"; + }; + }; + + { + + String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; + + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant); + + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"), + TEMPORAL_InstantHasBeginningInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval); + + } + + + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java new file mode 100644 index 0000000..61627cf --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java @@ -0,0 +1,218 @@ +package mvm.rya.indexing.external; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.QueryResultHandlerException; +import org.openrdf.query.TupleQueryResultHandler; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +import com.beust.jcommander.internal.Maps; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class ExternalIndexMain { + + private static String userStr = ""; + private static String passStr = ""; + + private static String instStr = ""; + private static String zooStr = ""; + + private static String tablePrefix = ""; + + private static String AUTHS = ""; + + public static void main(String[] args) throws Exception { + Preconditions.checkArgument(args.length == 6, "java " + ExternalIndexMain.class.getCanonicalName() + + " sparqlFile cbinstance cbzk cbuser cbpassword rdfTablePrefix."); + + final String sparqlFile = args[0]; + + instStr = args[1]; + zooStr = args[2]; + userStr = args[3]; + passStr = args[4]; + tablePrefix = args[5]; + + String queryString = FileUtils.readFileToString(new File(sparqlFile)); + + + // Look for Extra Indexes + Instance inst = new ZooKeeperInstance(instStr, zooStr); + Connector c = inst.getConnector(userStr, passStr.getBytes()); + + System.out.println("Searching for Indexes"); + Map<String, String> indexTables = Maps.newLinkedHashMap(); + for (String table : c.tableOperations().list()) { + if (table.startsWith(tablePrefix + "INDEX_")) { + Scanner s = c.createScanner(table, new Authorizations()); + s.setRange(Range.exact(new Text("~SPARQL"))); + for (Entry<Key, Value> e : s) { + indexTables.put(table, e.getValue().toString()); + } + } + } + + List<ExternalTupleSet> index = Lists.newArrayList(); + + if (indexTables.isEmpty()) { + System.out.println("No Index found"); + } else { + for (String table : indexTables.keySet()) { + String indexSparqlString = indexTables.get(table); + System.out.println("====================== INDEX FOUND ======================"); + System.out.println(" table : " + table); + System.out.println(" sparql : "); + System.out.println(indexSparqlString); + + index.add(new AccumuloIndexSet(indexSparqlString, c, table)); + } + } + + // Connect to Rya + Sail s = getRyaSail(); + SailRepository repo = new SailRepository(s); + repo.initialize(); + + // Perform Query + + CountingTupleQueryResultHandler count = new CountingTupleQueryResultHandler(); + + SailRepositoryConnection conn; + if (index.isEmpty()) { + conn = repo.getConnection(); + + } else { + ExternalProcessor processor = new ExternalProcessor(index); + + Sail processingSail = new ExternalSail(s, processor); + SailRepository smartSailRepo = new SailRepository(processingSail); + smartSailRepo.initialize(); + + conn = smartSailRepo.getConnection(); + } + + startTime = System.currentTimeMillis(); + lastTime = startTime; + System.out.println("Query Started"); + conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(count); + + System.out.println("Count of Results found : " + count.i); + System.out.println("Total query time (s) : " + (System.currentTimeMillis() - startTime) / 1000.); + } + + static long lastTime = 0; + static long startTime = 0; + + private static class CountingTupleQueryResultHandler implements TupleQueryResultHandler { + public int i = 0; + + @Override + public void handleBoolean(boolean value) throws QueryResultHandlerException { + } + + @Override + public void handleLinks(List<String> linkUrls) throws QueryResultHandlerException { + } + + @Override + public void startQueryResult(List<String> bindingNames) throws TupleQueryResultHandlerException { + System.out.println("First Result Recieved (s) : " + (System.currentTimeMillis() - startTime) / 1000.); + } + + @Override + public void endQueryResult() throws TupleQueryResultHandlerException { + } + + @Override + public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException { + i++; + if (i % 10 == 0) { + long mark = System.currentTimeMillis(); + System.out.println("Count : " + i + ". Time (s) : " + (mark - lastTime) / 1000.); + lastTime = mark; + } + + } + + } + + private static Configuration getConf() { + + Configuration conf = new Configuration(); + + conf.set(ConfigUtils.CLOUDBASE_USER, userStr); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, passStr); + + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instStr); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zooStr); + + conf.set(ConfigUtils.CLOUDBASE_AUTHS, AUTHS); + conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true); + return conf; + } + + private static Sail getRyaSail() throws AccumuloException, AccumuloSecurityException { + + Connector connector = ConfigUtils.getConnector(getConf()); + + final RdfCloudTripleStore store = new RdfCloudTripleStore(); + AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector(connector); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(getConf()); + conf.setTablePrefix(tablePrefix); + crdfdao.setConf(conf); + store.setRyaDAO(crdfdao); + + return store; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalProcessor.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalProcessor.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalProcessor.java new file mode 100644 index 0000000..b387806 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalProcessor.java @@ -0,0 +1,725 @@ +package mvm.rya.indexing.external; + +/* + * #%L + * mvm.rya.rya.indexing + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.openrdf.query.algebra.BindingSetAssignment; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Processes a {@link TupleExpr} and replaces sets of elements in the tree with {@link ExternalTupleSet} objects. + */ +public class ExternalProcessor { + + private List<ExternalTupleSet> indexSet; + + public ExternalProcessor(List<ExternalTupleSet> indexSet) { + this.indexSet = indexSet; + } + + /** + * Iterates through list of indexes and replaces all subtrees of query which match index with external tuple object built from index. + * + * @param query + * @return TupleExpr + */ + public TupleExpr process(TupleExpr query) { + TupleExpr rtn = query.clone(); + + + //move BindingSetAssignment Nodes out of the way + organizeBSAs(rtn); + + + // test to see if query contains no other nodes + // than filter, join, projection, and statement pattern and + // test whether query contains duplicate StatementPatterns and filters + if (isTupleValid(rtn)) { + + for (ExternalTupleSet index : indexSet) { + + // test to see if index contains at least one StatementPattern, + // that StatementPatterns are unique, + // and that all variables found in filters occur in some + // StatementPattern + if (isTupleValid(index.getTupleExpr())) { + + List<TupleExpr> normalize = getMatches(rtn, index.getTupleExpr()); + + for (TupleExpr tup : normalize) { + ExternalTupleSet eTup = (ExternalTupleSet) index.clone(); + setTableMap(tup, eTup); + setSupportedVarOrderMap(eTup); + eTup.setProjectionExpr((Projection) tup); + SPBubbleDownVisitor indexVistor = new SPBubbleDownVisitor(eTup); + rtn.visit(indexVistor); + FilterBubbleManager fbmv = new FilterBubbleManager(eTup); + rtn.visit(fbmv); + SubsetEqualsVisitor subIndexVis = new SubsetEqualsVisitor(eTup); + rtn.visit(subIndexVis); + + } + } + + } + + return rtn; + } else { + throw new IllegalArgumentException("Invalid Query."); + } + } + + + private void setTableMap(TupleExpr tupleMatch, ExternalTupleSet index) { + + List<String> replacementVars = Lists.newArrayList(tupleMatch.getBindingNames()); + List<String> tableVars = Lists.newArrayList(index.getTupleExpr().getBindingNames()); + + Map<String,String> tableMap = Maps.newHashMap(); + + for(int i = 0; i < tableVars.size(); i++) { + tableMap.put(replacementVars.get(i), tableVars.get(i)); + } + index.setTableVarMap(tableMap); + + + } + + private void setSupportedVarOrderMap(ExternalTupleSet index) { + + Map<String, Set<String>> supportedVarOrders = Maps.newHashMap(); + BiMap<String, String> biMap = HashBiMap.create(index.getTableVarMap()).inverse(); + Map<String, Set<String>> oldSupportedVarOrders = index.getSupportedVariableOrderMap(); + + Set<String> temp = null; + Set<String> keys = oldSupportedVarOrders.keySet(); + + for (String s : keys) { + temp = oldSupportedVarOrders.get(s); + Set<String> newSet = Sets.newHashSet(); + + for (String t : temp) { + newSet.add(biMap.get(t)); + } + + String[] tempStrings = s.split("\u0000"); + String v = ""; + for(String u: tempStrings) { + if(v.length() == 0){ + v = v + biMap.get(u); + } else { + v = v + "\u0000" + biMap.get(u); + } + } + + supportedVarOrders.put(v, newSet); + + } + + index.setSupportedVariableOrderMap(supportedVarOrders); + + } + + + + private List<TupleExpr> getMatches(TupleExpr query, TupleExpr tuple) { + + try { + List<TupleExpr> list = QueryVariableNormalizer.getNormalizedIndex(query, tuple); + // System.out.println("Match list is " + list); + return list; + } catch (Exception e) { + System.out.println(e); + } + + return new ArrayList<TupleExpr>(); + + } + + // determines whether query is valid, which requires that a + // query must contain a StatementPattern, not contain duplicate + // Statement Patterns or Filters, not be comprised of only Projection, + // Join, StatementPattern, and Filter nodes, and that any variable + // appearing in a Filter must appear in a StatementPattern. + private static boolean isTupleValid(QueryModelNode node) { + + ValidQueryVisitor vqv = new ValidQueryVisitor(); + node.visit(vqv); + + Set<String> spVars = getVarNames(getQNodes("sp", node)); + + if (vqv.isValid() && (spVars.size() > 0)) { + + FilterCollector fvis = new FilterCollector(); + node.visit(fvis); + List<QueryModelNode> fList = fvis.getFilters(); + return (fList.size() == Sets.newHashSet(fList).size() && getVarNames(fList).size() <= spVars.size()); + + } else { + return false; + } + } + + private static Set<QueryModelNode> getQNodes(QueryModelNode queryNode) { + Set<QueryModelNode> rtns = new HashSet<QueryModelNode>(); + + StatementPatternCollector spc = new StatementPatternCollector(); + queryNode.visit(spc); + rtns.addAll(spc.getStatementPatterns()); + + FilterCollector fvis = new FilterCollector(); + queryNode.visit(fvis); + rtns.addAll(fvis.getFilters()); + + ExternalTupleCollector eVis = new ExternalTupleCollector(); + queryNode.visit(eVis); + rtns.addAll(eVis.getExtTup()); + + return rtns; + } + + private static Set<QueryModelNode> getQNodes(String node, QueryModelNode queryNode) { + + if (node.equals("sp")) { + Set<QueryModelNode> eSet = new HashSet<QueryModelNode>(); + StatementPatternCollector spc = new StatementPatternCollector(); + queryNode.visit(spc); + List<StatementPattern> spList = spc.getStatementPatterns(); + eSet.addAll(spList); + // returns empty set if list contains duplicate StatementPatterns + if (spList.size() > eSet.size()) { + return Sets.newHashSet(); + } else { + return eSet; + } + } else if (node.equals("filter")) { + + FilterCollector fvis = new FilterCollector(); + queryNode.visit(fvis); + + return Sets.newHashSet(fvis.getFilters()); + } else { + + throw new IllegalArgumentException("Invalid node type."); + } + } + + // moves StatementPatterns in query that also occur in index to bottom of + // query tree. + private static class SPBubbleDownVisitor extends QueryModelVisitorBase<RuntimeException> { + + private TupleExpr tuple; + private QueryModelNode indexQNode; + private Set<QueryModelNode> sSet = Sets.newHashSet(); + + public SPBubbleDownVisitor(ExternalTupleSet index) { + + this.tuple = index.getTupleExpr(); + indexQNode = ((Projection) tuple).getArg(); + sSet = getQNodes("sp", indexQNode); + + } + + public void meet(Projection node) { + // moves external tuples above statement patterns before attempting + // to bubble down index statement patterns found in query tree + + organizeExtTuples(node); + + super.meet(node); + } + + public void meet(Join node) { + // if right node contained in index, move it to bottom of query tree + if (sSet.contains(node.getRightArg())) { + + Set<QueryModelNode> eSet = getQNodes("sp", node); + Set<QueryModelNode> compSet = Sets.difference(eSet, sSet); + + if (eSet.containsAll(sSet)) { + + QNodeExchanger qne = new QNodeExchanger(node.getRightArg(), compSet); + node.visit(qne); + node.replaceChildNode(node.getRightArg(), qne.getReplaced()); + + super.meet(node); + } + return; + } + // if left node contained in index, move it to bottom of query tree + else if (sSet.contains(node.getLeftArg())) { + + Set<QueryModelNode> eSet = getQNodes("sp", node); + Set<QueryModelNode> compSet = Sets.difference(eSet, sSet); + + if (eSet.containsAll(sSet)) { + + QNodeExchanger qne = new QNodeExchanger(node.getLeftArg(), compSet); + node.visit(qne); + node.replaceChildNode(node.getLeftArg(), qne.getReplaced()); + + super.meet(node); + } + return; + + } else { + super.meet(node); + } + + } + + // moves all ExternalTupleSets in query tree above remaining + // StatementPatterns + private static void organizeExtTuples(QueryModelNode node) { + + ExternalTupleCollector eVis = new ExternalTupleCollector(); + node.visit(eVis); + + ExtTupleExchangeVisitor oev = new ExtTupleExchangeVisitor(eVis.getExtTup()); + node.visit(oev); + } + + } + + // given a replacement QueryModelNode and compSet, this visitor replaces the + // first + // element in the query tree that occurs in compSet with replacement and + // returns + // the element that was replaced. + private static class QNodeExchanger extends QueryModelVisitorBase<RuntimeException> { + + private QueryModelNode toBeReplaced; + private QueryModelNode replacement; + private Set<QueryModelNode> compSet; + + public QNodeExchanger(QueryModelNode replacement, Set<QueryModelNode> compSet) { + this.replacement = replacement; + this.toBeReplaced = replacement; + this.compSet = compSet; + } + + public QueryModelNode getReplaced() { + return toBeReplaced; + } + + public void meet(Join node) { + + if (compSet.contains(node.getRightArg())) { + this.toBeReplaced = node.getRightArg(); + node.replaceChildNode(node.getRightArg(), replacement); + return; + } else if (compSet.contains(node.getLeftArg())) { + this.toBeReplaced = node.getLeftArg(); + node.replaceChildNode(node.getLeftArg(), replacement); + return; + } else { + super.meet(node); + } + + } + + } + + // moves filter that occurs in both query and index down the query tree so + // that that it is positioned + // above statement patterns associated with index. Precondition for calling + // this method is that + // SPBubbleDownVisitor has been called to position index StatementPatterns + // within query tree. + //TODO this visitor assumes that all filters are positioned at top of query tree + //could lead to problems if filter optimizer called before external processor + private static class FilterBubbleDownVisitor extends QueryModelVisitorBase<RuntimeException> { + + private QueryModelNode filter; + private Set<QueryModelNode> compSet; + private boolean filterPlaced = false; + + public FilterBubbleDownVisitor(QueryModelNode filter, Set<QueryModelNode> compSet) { + this.filter = filter; + this.compSet = compSet; + + } + + public boolean filterPlaced() { + return filterPlaced; + } + + public void meet(Join node) { + + if (!compSet.contains(node.getRightArg())) { + // looks for placed to position filter node. if right node is + // contained in index + // and left node is statement pattern node contained in index or + // is a join, place + // filter above join. + if (node.getLeftArg() instanceof Join || !(compSet.contains(node.getLeftArg()))) { + + QueryModelNode pNode = node.getParentNode(); + ((Filter) filter).setArg(node); + pNode.replaceChildNode(node, filter); + filterPlaced = true; + + return; + } // otherwise place filter below join and above right arg + else { + ((Filter) filter).setArg(node.getRightArg()); + node.replaceChildNode(node.getRightArg(), filter); + filterPlaced = true; + return; + + } + } else if ((node.getLeftArg() instanceof StatementPattern) && !compSet.contains(node.getLeftArg())) { + + ((Filter) filter).setArg(node.getLeftArg()); + node.replaceChildNode(node.getLeftArg(), filter); + filterPlaced = true; + + return; + } else { + super.meet(node); + } + } + + } + + private static Set<String> getVarNames(Collection<QueryModelNode> nodes) { + + List<String> tempVars; + Set<String> nodeVarNames = Sets.newHashSet(); + + for (QueryModelNode s : nodes) { + tempVars = VarCollector.process(s); + for (String t : tempVars) + nodeVarNames.add(t); + } + return nodeVarNames; + + } + + // visitor which determines whether or not to reposition a filter by calling + // FilterBubbleDownVisitor + private static class FilterBubbleManager extends QueryModelVisitorBase<RuntimeException> { + + private TupleExpr tuple; + private QueryModelNode indexQNode; + private Set<QueryModelNode> sSet = Sets.newHashSet(); + private Set<QueryModelNode> bubbledFilters = Sets.newHashSet(); + + public FilterBubbleManager(ExternalTupleSet index) { + this.tuple = index.getTupleExpr(); + indexQNode = ((Projection) tuple).getArg(); + sSet = getQNodes(indexQNode); + + } + + public void meet(Filter node) { + + Set<QueryModelNode> eSet = getQNodes(node); + Set<QueryModelNode> compSet = Sets.difference(eSet, sSet); + + // if index contains filter node and it hasn't already been moved, + // move it down + // query tree just above position of statement pattern nodes found + // in both query tree + // and index (assuming that SPBubbleDownVisitor has already been + // called) + if (sSet.contains(node.getCondition()) && !bubbledFilters.contains(node.getCondition())) { + FilterBubbleDownVisitor fbdv = new FilterBubbleDownVisitor((Filter) node.clone(), compSet); + node.visit(fbdv); + bubbledFilters.add(node.getCondition()); + // checks if filter correctly placed, and if it has been, + // removes old copy of filter + if (fbdv.filterPlaced()) { + + QueryModelNode pNode = node.getParentNode(); + TupleExpr cNode = node.getArg(); + pNode.replaceChildNode(node, cNode); + + + super.meetNode(pNode); + } + super.meet(node); + + } else { + super.meet(node); + } + } + } + + // iterates through the query tree and attempts to match subtrees with + // index. When a match is + // found, the subtree is replaced by an ExternalTupleSet formed from the + // index. Pre-condition for + // calling this method is that both SPBubbleDownVisitor and + // FilterBubbleManager have been called + // to position the StatementPatterns and Filters. + private static class SubsetEqualsVisitor extends QueryModelVisitorBase<RuntimeException> { + + private TupleExpr tuple; + private QueryModelNode indexQNode; + private ExternalTupleSet set; + private Set<QueryModelNode> sSet = Sets.newHashSet(); + + public SubsetEqualsVisitor(ExternalTupleSet index) { + this.tuple = index.getTupleExpr(); + this.set = index; + indexQNode = ((Projection) tuple).getArg(); + sSet = getQNodes(indexQNode); + + } + + public void meet(Join node) { + + Set<QueryModelNode> eSet = getQNodes(node); + + if (eSet.containsAll(sSet) && !(node.getRightArg() instanceof BindingSetAssignment)) { + +// System.out.println("Eset is " + eSet + " and sSet is " + sSet); + + if (eSet.equals(sSet)) { + node.replaceWith(set); + return; + } else { + if (node.getLeftArg() instanceof StatementPattern && sSet.size() == 1) { + if(sSet.contains(node.getLeftArg())) { + node.setLeftArg(set); + } else if(sSet.contains(node.getRightArg())) { + node.setRightArg(set); + } else { + return; + } + } + else { + super.meet(node); + } + } + } else if (eSet.containsAll(sSet)) { + + super.meet(node); + + } + + } + //TODO might need to include BindingSetAssignment Condition here + //to account for index consisting of only filter and BindingSetAssignment nodes + public void meet(Filter node) { + + Set<QueryModelNode> eSet = getQNodes(node); + + if (eSet.containsAll(sSet)) { + + if (eSet.equals(sSet)) { + node.replaceWith(set); + return; + } else { + super.meet(node); + } + } + } + } + + // visitor which determines whether a query is valid (i.e. it does not + // contain nodes other than + // Projection, Join, Filter, StatementPattern ) + private static class ValidQueryVisitor extends QueryModelVisitorBase<RuntimeException> { + + private boolean isValid = true; + + public boolean isValid() { + return isValid; + } + + public void meet(Projection node) { + node.getArg().visit(this); + } + + public void meet(Filter node) { + node.getArg().visit(this); + } + + + + + + public void meetNode(QueryModelNode node) { + + if (!((node instanceof Join) || (node instanceof StatementPattern) || (node instanceof BindingSetAssignment) || (node instanceof Var))) { + isValid = false; + return; + + } else{ + super.meetNode(node); + } + } + + } + + // repositions ExternalTuples above StatementPatterns within query tree + private static class ExtTupleExchangeVisitor extends QueryModelVisitorBase<RuntimeException> { + + private Set<QueryModelNode> extTuples; + + public ExtTupleExchangeVisitor(Set<QueryModelNode> extTuples) { + this.extTuples = extTuples; + } + + public void meet(Join queryNode) { + + // if query tree contains external tuples and they are not + // positioned above statement pattern node + // reposition + if (this.extTuples.size() > 0 && !(queryNode.getRightArg() instanceof ExternalTupleSet) + && !(queryNode.getRightArg() instanceof BindingSetAssignment)) { + QNodeExchanger qnev = new QNodeExchanger((QueryModelNode) queryNode.getRightArg(), this.extTuples); + queryNode.visit(qnev); + queryNode.setRightArg((TupleExpr)qnev.getReplaced()); + super.meet(queryNode); + } else { + super.meet(queryNode); + } + + } + + } + + private static class ExternalTupleCollector extends QueryModelVisitorBase<RuntimeException> { + + private Set<QueryModelNode> eSet = new HashSet<QueryModelNode>(); + + @Override + public void meetNode(QueryModelNode node) throws RuntimeException { + if (node instanceof ExternalTupleSet) { + eSet.add(node); + } + super.meetNode(node); + } + + public Set<QueryModelNode> getExtTup() { + return eSet; + } + + } + + private static class FilterCollector extends QueryModelVisitorBase<RuntimeException> { + + private List<QueryModelNode> filterList = Lists.newArrayList(); + + public List<QueryModelNode> getFilters() { + return filterList; + } + + @Override + public void meet(Filter node) { + filterList.add(node.getCondition()); + super.meet(node); + } + + } + + private static void organizeBSAs(QueryModelNode node) { + + BindingSetAssignmentCollector bsac = new BindingSetAssignmentCollector(); + node.visit(bsac); + + if (bsac.containsBSAs()) { + Set<QueryModelNode> bsaSet = bsac.getBindingSetAssignments(); + BindingSetAssignmentExchangeVisitor bsaev = new BindingSetAssignmentExchangeVisitor(bsaSet); + node.visit(bsaev); + } + } + + // repositions ExternalTuples above StatementPatterns within query tree + private static class BindingSetAssignmentExchangeVisitor extends QueryModelVisitorBase<RuntimeException> { + + private Set<QueryModelNode> bsas; + + public BindingSetAssignmentExchangeVisitor(Set<QueryModelNode> bsas) { + this.bsas = bsas; + } + + public void meet(Join queryNode) { + + // if query tree contains external tuples and they are not + // positioned above statement pattern node + // reposition + if (this.bsas.size() > 0 && !(queryNode.getRightArg() instanceof BindingSetAssignment)) { + QNodeExchanger qnev = new QNodeExchanger((QueryModelNode) queryNode.getRightArg(), bsas); + queryNode.visit(qnev); + queryNode.replaceChildNode(queryNode.getRightArg(), qnev.getReplaced()); + super.meet(queryNode); + } else { + super.meet(queryNode); + } + + } + + } + + + public static class BindingSetAssignmentCollector extends QueryModelVisitorBase<RuntimeException> { + + private Set<QueryModelNode> bindingSetList = Sets.newHashSet(); + + public Set<QueryModelNode> getBindingSetAssignments() { + return bindingSetList; + } + + public boolean containsBSAs() { + return (bindingSetList.size() > 0); + } + + @Override + public void meet(BindingSetAssignment node) { + bindingSetList.add(node); + super.meet(node); + } + + } + + // TODO insert BindingSetAssignments at bottom of query tree --this approach assumes + // BindingSetAssignments always removed during creation of ExternalTupleSets within + // query. There may be cases where this precondition does not hold (all BindingSetAssignments + // not removed). For now assuming it always holds. + +}
