http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index 12791a6..be29d3c 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo.temporal; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -67,16 +67,17 @@ import mvm.rya.api.RdfCloudTripleStoreConfiguration; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.KeyParts; -import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.StatementSerializer; import mvm.rya.indexing.TemporalIndexer; import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInstantRfc3339; import mvm.rya.indexing.TemporalInterval; import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.StatementSerializer; public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { - private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class); + private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class); private static final String CF_INTERVAL = "interval"; @@ -115,22 +116,22 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements //initialization occurs in setConf because index is created using reflection @Override - public void setConf(Configuration conf) { + public void setConf(final Configuration conf) { this.conf = conf; if (!isInit) { try { initInternal(); isInit = true; - } catch (AccumuloException e) { + } catch (final AccumuloException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); - } catch (TableExistsException e) { + } catch (final TableExistsException e) { logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } @@ -139,7 +140,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public Configuration getConf() { - return this.conf; + return conf; } @@ -152,34 +153,36 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * if that fails, tries: org.joda.time.DateTime.parse() . * T O D O parse an interval using multiple predicates for same subject -- ontology dependent. */ - private void storeStatement(Statement statement) throws IOException, IllegalArgumentException { + private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException { // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - if (!isValidPredicate || !(statement.getObject() instanceof Literal)) + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + if (!isValidPredicate || !(statement.getObject() instanceof Literal)) { return; - DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval + } + final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval extractDateTime(statement, indexDateTimes); - if (indexDateTimes[0]==null) - return; + if (indexDateTimes[0]==null) { + return; + } // Add this as an instant, or interval. try { if (indexDateTimes[1] != null) { - TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); + final TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); addInterval(temporalIndexBatchWriter, interval, statement); } else { - TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); + final TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); addInstant(temporalIndexBatchWriter, instant, statement); } - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new IOException("While adding interval/instant for statement =" + statement, e); } } @Override - public void storeStatement(RyaStatement statement) throws IllegalArgumentException, IOException { + public void storeStatement(final RyaStatement statement) throws IllegalArgumentException, IOException { storeStatement(RyaToRdfConversions.convertStatement(statement)); } @@ -191,45 +194,46 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param statement * @param outputDateTimes */ - private void extractDateTime(Statement statement, DateTime[] outputDateTimes) { - if (!(statement.getObject() instanceof Literal)) // Error since it should already be tested by caller. + private void extractDateTime(final Statement statement, final DateTime[] outputDateTimes) { + if (!(statement.getObject() instanceof Literal)) { throw new RuntimeException("Statement's object must be a literal: " + statement); + } // throws IllegalArgumentException NumberFormatException if can't parse - String logThis = null; Literal literalValue = (Literal) statement.getObject(); + String logThis = null; final Literal literalValue = (Literal) statement.getObject(); // First attempt to parse a interval in the form "[date1,date2]" - Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue()); + final Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue()); if (matcher.find()) { - try { - // Got a datetime pair, parse into an interval. - outputDateTimes[0] = new DateTime(matcher.group(1)); - outputDateTimes[1] = new DateTime(matcher.group(2)); - return; - } catch (java.lang.IllegalArgumentException e) { + try { + // Got a datetime pair, parse into an interval. + outputDateTimes[0] = new DateTime(matcher.group(1)); + outputDateTimes[1] = new DateTime(matcher.group(2)); + return; + } catch (final java.lang.IllegalArgumentException e) { logThis = e.getMessage() + " " + logThis; outputDateTimes[0]=null; outputDateTimes[1]=null; - } - } - - try { - XMLGregorianCalendar calendarValue = literalValue.calendarValue(); - outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar()); - outputDateTimes[1] = null; - return; - } catch (java.lang.IllegalArgumentException e) { + } + } + + try { + final XMLGregorianCalendar calendarValue = literalValue.calendarValue(); + outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar()); + outputDateTimes[1] = null; + return; + } catch (final java.lang.IllegalArgumentException e) { logThis = e.getMessage(); - } - // Try again using Joda Time DateTime.parse() - try { - outputDateTimes[0] = DateTime.parse(literalValue.stringValue()); - outputDateTimes[1] = null; - //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue()); - return; - } catch (java.lang.IllegalArgumentException e) { + } + // Try again using Joda Time DateTime.parse() + try { + outputDateTimes[0] = DateTime.parse(literalValue.stringValue()); + outputDateTimes[1] = null; + //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue()); + return; + } catch (final java.lang.IllegalArgumentException e) { logThis = e.getMessage() + " " + logThis; - } + } logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis); - return; + return; } /** @@ -240,10 +244,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param interval * @throws MutationsRejectedException */ - public void removeInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException { - Text cf = new Text(StatementSerializer.writeContext(statement)); - Text cqBegin = new Text(KeyParts.CQ_BEGIN); - Text cqEnd = new Text(KeyParts.CQ_END); + public void removeInterval(final BatchWriter writer, final TemporalInterval interval, final Statement statement) throws MutationsRejectedException { + final Text cf = new Text(StatementSerializer.writeContext(statement)); + final Text cqBegin = new Text(KeyParts.CQ_BEGIN); + final Text cqEnd = new Text(KeyParts.CQ_END); // Start Begin index Text keyText = new Text(interval.getAsKeyBeginning()); @@ -268,10 +272,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param instant * @throws MutationsRejectedException */ - public void removeInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { - KeyParts keyParts = new KeyParts(statement, instant); - for (KeyParts k: keyParts) { - Mutation m = new Mutation(k.getStoreKey()); + public void removeInstant(final BatchWriter writer, final TemporalInstant instant, final Statement statement) throws MutationsRejectedException { + final KeyParts keyParts = new KeyParts(statement, instant); + for (final KeyParts k: keyParts) { + final Mutation m = new Mutation(k.getStoreKey()); m.putDelete(k.cf, k.cq); writer.addMutation(m); } @@ -285,12 +289,12 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param interval * @throws MutationsRejectedException */ - public void addInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException { + public void addInterval(final BatchWriter writer, final TemporalInterval interval, final Statement statement) throws MutationsRejectedException { - Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); - Text cf = new Text(StatementSerializer.writeContext(statement)); - Text cqBegin = new Text(KeyParts.CQ_BEGIN); - Text cqEnd = new Text(KeyParts.CQ_END); + final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); + final Text cf = new Text(StatementSerializer.writeContext(statement)); + final Text cqBegin = new Text(KeyParts.CQ_BEGIN); + final Text cqEnd = new Text(KeyParts.CQ_END); // Start Begin index Text keyText = new Text(interval.getAsKeyBeginning()); @@ -321,10 +325,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param instant * @throws MutationsRejectedException */ - public void addInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { - KeyParts keyParts = new KeyParts(statement, instant); - for (KeyParts k : keyParts) { - Mutation m = new Mutation(k.getStoreKey()); + public void addInstant(final BatchWriter writer, final TemporalInstant instant, final Statement statement) throws MutationsRejectedException { + final KeyParts keyParts = new KeyParts(statement, instant); + for (final KeyParts k : keyParts) { + final Mutation m = new Mutation(k.getStoreKey()); m.put(k.cf, k.cq,k.getValue()); writer.addMutation(m); } @@ -339,16 +343,16 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @throws IOException */ private Scanner getScanner() throws QueryEvaluationException { - String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName; + final String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName; Scanner scanner = null; try { scanner = ConfigUtils.createScanner(temporalIndexTableName, conf); - } catch (AccumuloException e) { + } catch (final AccumuloException e) { logger.error(whileDoing, e); throw new QueryEvaluationException(whileDoing, e); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { throw new QueryEvaluationException(whileDoing, e); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { logger.error(whileDoing, e); throw new QueryEvaluationException(whileDoing + " The temporal index table should have been created by this constructor, if found missing.", e); @@ -356,21 +360,21 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements return scanner; } - private BatchScanner getBatchScanner() throws QueryEvaluationException { - String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName; - try { - return ConfigUtils.createBatchScanner(temporalIndexTableName, conf); - } catch (AccumuloException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing, e); - } catch (AccumuloSecurityException e) { - throw new QueryEvaluationException(whileDoing, e); - } catch (TableNotFoundException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing - + " The temporal index table should have been created by this constructor, if found missing. ", e); - } - } + private BatchScanner getBatchScanner() throws QueryEvaluationException { + final String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName; + try { + return ConfigUtils.createBatchScanner(temporalIndexTableName, conf); + } catch (final AccumuloException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing, e); + } catch (final AccumuloSecurityException e) { + throw new QueryEvaluationException(whileDoing, e); + } catch (final TableNotFoundException e) { + logger.error(whileDoing, e); + throw new QueryEvaluationException(whileDoing + + " The temporal index table should have been created by this constructor, if found missing. ", e); + } + } /** @@ -378,18 +382,18 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant( - TemporalInstant queryInstant, StatementContraints constraints) + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { // get rows where the repository time is equal to the given time in queryInstant. - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey())); - return Range.prefix(keyParts.getQueryKey()); // <-- specific logic - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - // TODO currently context constraints are filtered on the client. + final Query query = new Query() { + @Override + public Range getRange(final KeyParts keyParts) { + //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey())); + return Range.prefix(keyParts.getQueryKey()); // <-- specific logic + } + }; + final ScannerBase scanner = query.doQuery(queryInstant, constraints); + // TODO currently context constraints are filtered on the client. return getContextIteratorWrapper(scanner, constraints.getContext()); } @@ -398,24 +402,25 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant( - TemporalInstant queryInstant, StatementContraints constraints) + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { // get rows where the repository time is before the given time. - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - Text start= null; - if (keyParts.constraintPrefix != null ) // Yes, has constraints - start = keyParts.constraintPrefix; // <-- start specific logic - else - start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); - Text endAt = keyParts.getQueryKey(); // <-- end specific logic - //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, true, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); + final Query query = new Query() { + @Override + public Range getRange(final KeyParts keyParts) { + Text start= null; + if (keyParts.constraintPrefix != null ) { + start = keyParts.constraintPrefix; // <-- start specific logic + } else { + start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); + } + final Text endAt = keyParts.getQueryKey(); // <-- end specific logic + //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, true, endAt, false); + } + }; + final ScannerBase scanner = query.doQuery(queryInstant, constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); } /** @@ -423,29 +428,30 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant( - TemporalInstant queryInstant, StatementContraints constraints) + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic - Text endAt = null; // no constraints // <-- specific logic - if (keyParts.constraintPrefix != null ) // Yes, has constraints - endAt = Range.followingPrefix(keyParts.constraintPrefix); - //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, true, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); - } - - /** + final Query query = new Query() { + @Override + public Range getRange(final KeyParts keyParts) { + final Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic + Text endAt = null; // no constraints // <-- specific logic + if (keyParts.constraintPrefix != null ) { + endAt = Range.followingPrefix(keyParts.constraintPrefix); + } + //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, true, endAt, false); + } + }; + final ScannerBase scanner = query.doQuery(queryInstant, constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); + } + + /** * Get instances before a given interval. Returns queryInstantBeforeInstant with the interval's beginning time. */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval( - TemporalInterval givenInterval, StatementContraints contraints) + final TemporalInterval givenInterval, final StatementConstraints contraints) throws QueryEvaluationException { return queryInstantBeforeInstant(givenInterval.getHasBeginning(), contraints); } @@ -455,7 +461,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval( - TemporalInterval givenInterval, StatementContraints contraints) throws QueryEvaluationException { + final TemporalInterval givenInterval, final StatementConstraints contraints) throws QueryEvaluationException { return queryInstantAfterInstant(givenInterval.getHasEnd(), contraints); } @@ -465,30 +471,30 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * exclusive (don't match the beginning and ending). */ @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( - TemporalInterval queryInterval, StatementContraints constraints) - throws QueryEvaluationException { - // get rows where the time is after the given interval's beginning time and before the ending time. - final TemporalInterval theQueryInterval = queryInterval; - Query query = new Query() { - private final TemporalInterval queryInterval = theQueryInterval; - @Override - public Range getRange(KeyParts keyParts) { - Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning()))); - Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic - //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, false, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); - } + public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( + final TemporalInterval queryInterval, final StatementConstraints constraints) + throws QueryEvaluationException { + // get rows where the time is after the given interval's beginning time and before the ending time. + final TemporalInterval theQueryInterval = queryInterval; + final Query query = new Query() { + private final TemporalInterval queryInterval = theQueryInterval; + @Override + public Range getRange(final KeyParts keyParts) { + final Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning()))); + final Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic + //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); + return new Range(start, false, endAt, false); + } + }; + final ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints); + return getContextIteratorWrapper(scanner, constraints.getContext()); + } /** * Get instances matching the beginning of a given interval. */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval( - TemporalInterval queryInterval, StatementContraints contraints) + final TemporalInterval queryInterval, final StatementConstraints contraints) throws QueryEvaluationException { return queryInstantEqualsInstant(queryInterval.getHasBeginning(), contraints); } @@ -498,7 +504,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements */ @Override public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval( - TemporalInterval queryInterval, StatementContraints contraints) + final TemporalInterval queryInterval, final StatementConstraints contraints) throws QueryEvaluationException { return queryInstantEqualsInstant(queryInterval.getHasEnd(), contraints); } @@ -509,116 +515,121 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * Currently predicate and subject constraints are filtered on the client. */ @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals( - TemporalInterval query, StatementContraints contraints) - throws QueryEvaluationException { - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the start and end match. - Range range = Range.prefix(new Text(query.getAsKeyBeginning())); - scanner.setRange(range); - if (contraints.hasContext()) - scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); - } - // Iterator<Entry<Key, Value>> iter = scanner.iterator(); - // while (iter.hasNext()) { - // System.out.println("queryIntervalEquals results:"+iter.next()); - // } - //return getConstrainedIteratorWrapper(scanner, contraints); - return getIteratorWrapper(scanner); - } - - /** - * find intervals stored in the repository before the given Interval. Find interval endings that are - * before the given beginning. + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals( + final TemporalInterval query, final StatementConstraints contraints) + throws QueryEvaluationException { + final Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the start and end match. + final Range range = Range.prefix(new Text(query.getAsKeyBeginning())); + scanner.setRange(range); + if (contraints.hasContext()) { + scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); + } else { + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); + } + } + // Iterator<Entry<Key, Value>> iter = scanner.iterator(); + // while (iter.hasNext()) { + // System.out.println("queryIntervalEquals results:"+iter.next()); + // } + //return getConstrainedIteratorWrapper(scanner, contraints); + return getIteratorWrapper(scanner); + } + + /** + * find intervals stored in the repository before the given Interval. Find interval endings that are + * before the given beginning. * Indexing Intervals will probably change or be removed. * Currently predicate and subject constraints are filtered on the client. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore( - TemporalInterval queryInterval, StatementContraints constraints) throws QueryEvaluationException - { - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the end date is less than the queryInterval.getBefore() - Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false); - scanner.setRange(range); - if (constraints.hasContext()) - scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END)); - } - return getIteratorWrapper(scanner); - } - - /** - * Interval after given interval. Find intervals that begin after the endings of the given interval. - * Use the special following prefix mechanism to avoid matching the beginning date. + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore( + final TemporalInterval queryInterval, final StatementConstraints constraints) throws QueryEvaluationException + { + final Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the end date is less than the queryInterval.getBefore() + final Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false); + scanner.setRange(range); + if (constraints.hasContext()) { + scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END)); + } else { + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END)); + } + } + return getIteratorWrapper(scanner); + } + + /** + * Interval after given interval. Find intervals that begin after the endings of the given interval. + * Use the special following prefix mechanism to avoid matching the beginning date. * Indexing Intervals will probably change or be removed. * Currently predicate and subject and context constraints are filtered on the client. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( - TemporalInterval queryInterval, StatementContraints constraints) - throws QueryEvaluationException { - - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the start date is greater than the queryInterval.getEnd() - Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true); - scanner.setRange(range); - - if (constraints.hasContext()) - scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); - } - // TODO currently predicate, subject and context constraints are filtered on the clients - return getIteratorWrapper(scanner); - } - // -- - // -- END of Query functions. Next up, general stuff used by the queries above. - // -- - - /** - * Allows passing range specific logic into doQuery. - * Each query function implements an anonymous instance of this and calls it's doQuery(). - */ - abstract class Query { - abstract protected Range getRange(KeyParts keyParts); - - public ScannerBase doQuery(TemporalInstant queryInstant, StatementContraints constraints) throws QueryEvaluationException { - // key is contraintPrefix + time, or just time. - // Any constraints handled here, if the constraints are empty, the - // thisKeyParts.contraintPrefix will be null. - List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints); - ScannerBase scanner = null; - if (keyParts.size() > 1) - scanner = getBatchScanner(); - else - scanner = getScanner(); - - Collection<Range> ranges = new HashSet<Range>(); - KeyParts lastKeyParts = null; - Range range = null; - for (KeyParts thisKeyParts : keyParts) { - range = this.getRange(thisKeyParts); - ranges.add(range); - lastKeyParts = thisKeyParts; - } - //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq); - scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq)); - if (scanner instanceof BatchScanner) - ((BatchScanner) scanner).setRanges(ranges); - else if (range != null) - ((Scanner) scanner).setRange(range); - return scanner; - } - } - - /** + */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( + final TemporalInterval queryInterval, final StatementConstraints constraints) + throws QueryEvaluationException { + + final Scanner scanner = getScanner(); + if (scanner != null) { + // get rows where the start date is greater than the queryInterval.getEnd() + final Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true); + scanner.setRange(range); + + if (constraints.hasContext()) { + scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); + } else { + scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); + } + } + // TODO currently predicate, subject and context constraints are filtered on the clients + return getIteratorWrapper(scanner); + } + // -- + // -- END of Query functions. Next up, general stuff used by the queries above. + // -- + + /** + * Allows passing range specific logic into doQuery. + * Each query function implements an anonymous instance of this and calls it's doQuery(). + */ + abstract class Query { + abstract protected Range getRange(KeyParts keyParts); + + public ScannerBase doQuery(final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { + // key is contraintPrefix + time, or just time. + // Any constraints handled here, if the constraints are empty, the + // thisKeyParts.contraintPrefix will be null. + final List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints); + ScannerBase scanner = null; + if (keyParts.size() > 1) { + scanner = getBatchScanner(); + } else { + scanner = getScanner(); + } + + final Collection<Range> ranges = new HashSet<Range>(); + KeyParts lastKeyParts = null; + Range range = null; + for (final KeyParts thisKeyParts : keyParts) { + range = getRange(thisKeyParts); + ranges.add(range); + lastKeyParts = thisKeyParts; + } + //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq); + scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq)); + if (scanner instanceof BatchScanner) { + ((BatchScanner) scanner).setRanges(ranges); + } else if (range != null) { + ((Scanner) scanner).setRange(range); + } + return scanner; + } + } + + /** * An iteration wrapper for a loaded scanner that is returned for each query above. * * @param scanner @@ -637,16 +648,16 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public Statement next() throws QueryEvaluationException { - Entry<Key, Value> entry = i.next(); - Value v = entry.getValue(); + final Entry<Key, Value> entry = i.next(); + final Value v = entry.getValue(); try { - String dataString = Text.decode(v.get(), 0, v.getSize()); - Statement s = StatementSerializer.readStatement(dataString); + final String dataString = Text.decode(v.get(), 0, v.getSize()); + final Statement s = StatementSerializer.readStatement(dataString); return s; - } catch (CharacterCodingException e) { + } catch (final CharacterCodingException e) { logger.error("Error decoding value=" + Arrays.toString(v.get()), e); throw new QueryEvaluationException(e); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Error de-serializing statement, string=" + v.get(), e); throw new QueryEvaluationException(e); } @@ -673,16 +684,17 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @return an anonymous object that will iterate the resulting statements from a given scanner. * @throws QueryEvaluationException */ - private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementContraints constraints) { - if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) - return getIteratorWrapper(scanner); - return new ConstrainedIteratorWrapper(scanner) { - @Override - public boolean allowedBy(Statement statement) { - return allowedByConstraints(statement, constraints); - } - }; - } + private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementConstraints constraints) { + if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) { + return getIteratorWrapper(scanner); + } + return new ConstrainedIteratorWrapper(scanner) { + @Override + public boolean allowedBy(final Statement statement) { + return allowedByConstraints(statement, constraints); + } + }; + } /** * An iteration wrapper for a loaded scanner that is returned for queries above. * Currently, this temporal index supports contexts only on the client, using this filter. @@ -692,82 +704,86 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @return an anonymous object that will iterate the resulting statements from a given scanner. * @throws QueryEvaluationException */ - private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) { - if (context==null) - return getIteratorWrapper(scanner); - return new ConstrainedIteratorWrapper(scanner) { - @Override - public boolean allowedBy(Statement statement) { - return allowedByContext(statement, context); - } - }; - } - /** - * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy(). - * If the allowedBy function returns false for the next statement, it is skipped. - * This is used for to do client side, what the index cannot (yet) do on the server side. - */ + private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) { + if (context==null) { + return getIteratorWrapper(scanner); + } + return new ConstrainedIteratorWrapper(scanner) { + @Override + public boolean allowedBy(final Statement statement) { + return allowedByContext(statement, context); + } + }; + } + /** + * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy(). + * If the allowedBy function returns false for the next statement, it is skipped. + * This is used for to do client side, what the index cannot (yet) do on the server side. + */ abstract static class ConstrainedIteratorWrapper implements CloseableIteration<Statement, QueryEvaluationException> { - private Statement nextStatement=null; - private boolean isInitialized = false; - final private Iterator<Entry<Key, Value>> i; - final private ScannerBase scanner; - - ConstrainedIteratorWrapper(ScannerBase scanner) { - this.scanner = scanner; - i=scanner.iterator(); - } + private Statement nextStatement=null; + private boolean isInitialized = false; + final private Iterator<Entry<Key, Value>> i; + final private ScannerBase scanner; + + ConstrainedIteratorWrapper(final ScannerBase scanner) { + this.scanner = scanner; + i=scanner.iterator(); + } @Override public boolean hasNext() throws QueryEvaluationException { - if (!isInitialized) - internalGetNext(); - return (nextStatement != null) ; + if (!isInitialized) { + internalGetNext(); + } + return (nextStatement != null) ; } @Override public Statement next() throws QueryEvaluationException { - if (nextStatement==null) { - if (!isInitialized) - internalGetNext(); - if (nextStatement==null) - throw new NoSuchElementException(); - } - // use this one, then get the next one loaded. - Statement thisStatement = this.nextStatement; - internalGetNext(); - return thisStatement; + if (nextStatement==null) { + if (!isInitialized) { + internalGetNext(); + } + if (nextStatement==null) { + throw new NoSuchElementException(); + } + } + // use this one, then get the next one loaded. + final Statement thisStatement = nextStatement; + internalGetNext(); + return thisStatement; } - /** - * Gets the next statement meeting constraints and stores in nextStatement. - * Sets null when all done, or on exception. - * @throws QueryEvaluationException - */ - private void internalGetNext() - throws QueryEvaluationException { - isInitialized=true; - this.nextStatement = null; // Default on done or error. - Statement statement = null; - while (i.hasNext()) { - Entry<Key, Value> entry = i.next(); - Value v = entry.getValue(); - try { - String dataString = Text.decode(v.get(), 0, v.getSize()); - statement = StatementSerializer.readStatement(dataString); - } catch (CharacterCodingException e) { - logger.error("Error decoding value=" + Arrays.toString(v.get()), e); - throw new QueryEvaluationException(e); - } catch (IOException e) { - logger.error("Error de-serializing statement, string=" + v.get(), e); - throw new QueryEvaluationException(e); - } - if (allowedBy(statement)) { - this.nextStatement = statement; - return; - } - } - } - public abstract boolean allowedBy(Statement s); + /** + * Gets the next statement meeting constraints and stores in nextStatement. + * Sets null when all done, or on exception. + * @throws QueryEvaluationException + */ + private void internalGetNext() + throws QueryEvaluationException { + isInitialized=true; + nextStatement = null; // Default on done or error. + Statement statement = null; + while (i.hasNext()) { + final Entry<Key, Value> entry = i.next(); + final Value v = entry.getValue(); + try { + final String dataString = Text.decode(v.get(), 0, v.getSize()); + statement = StatementSerializer.readStatement(dataString); + } catch (final CharacterCodingException e) { + logger.error("Error decoding value=" + Arrays.toString(v.get()), e); + throw new QueryEvaluationException(e); + } catch (final IOException e) { + logger.error("Error de-serializing statement, string=" + v.get(), e); + throw new QueryEvaluationException(e); + } + if (allowedBy(statement)) { + nextStatement = statement; + return; + } + } + } + public abstract boolean allowedBy(Statement s); @Override public void remove() { @@ -786,35 +802,39 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * @param contraints fields that are non-null must match the statement's components, otherwise it is not allowed. * @return true if the parts of the statement match the statementConstraints' parts. */ - protected static boolean allowedByConstraints(Statement statement, StatementContraints constraints) { - - if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString())) - {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;} - //return false; - - if (! allowedByContext(statement, constraints.getContext())) - return false; - //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;} - - if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate())) - return false; - //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;} - - System.out.println("allow statement: "+ statement.toString()); - return true; - } - - /** - * Allow only if the context matches the statement. This is a client side filter. - * @param statement - * @param context - * @return - */ - protected static boolean allowedByContext(Statement statement, Resource context) { - return context==null || context.equals( statement.getContext() ); - } - - @Override + protected static boolean allowedByConstraints(final Statement statement, final StatementConstraints constraints) { + + if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString())) + {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;} + //return false; + + if (! allowedByContext(statement, constraints.getContext())) + { + return false; + //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;} + } + + if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate())) + { + return false; + //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;} + } + + System.out.println("allow statement: "+ statement.toString()); + return true; + } + + /** + * Allow only if the context matches the statement. This is a client side filter. + * @param statement + * @param context + * @return + */ + protected static boolean allowedByContext(final Statement statement, final Resource context) { + return context==null || context.equals( statement.getContext() ); + } + + @Override public Set<URI> getIndexablePredicates() { return validPredicates; @@ -829,8 +849,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements public void flush() throws IOException { try { mtbw.flush(); - } catch (MutationsRejectedException e) { - String msg = "Error while flushing the batch writer."; + } catch (final MutationsRejectedException e) { + final String msg = "Error while flushing the batch writer."; logger.error(msg, e); throw new IOException(msg, e); } @@ -847,8 +867,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements mtbw.close(); - } catch (MutationsRejectedException e) { - String msg = "Error while closing the batch writer."; + } catch (final MutationsRejectedException e) { + final String msg = "Error while closing the batch writer."; logger.error(msg, e); throw new IOException(msg, e); } @@ -861,13 +881,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements return ConfigUtils.getTemporalTableName(conf); } - private void deleteStatement(Statement statement) throws IOException, IllegalArgumentException { + private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException { // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - if (!isValidPredicate || !(statement.getObject() instanceof Literal)) + final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + if (!isValidPredicate || !(statement.getObject() instanceof Literal)) { return; - DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval + } + final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval extractDateTime(statement, indexDateTimes); if (indexDateTimes[0] == null) { return; @@ -876,49 +897,49 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements // Remove this as an instant, or interval. try { if (indexDateTimes[1] != null) { - TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); + final TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); removeInterval(temporalIndexBatchWriter, interval, statement); } else { - TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); + final TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); removeInstant(temporalIndexBatchWriter, instant, statement); } - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new IOException("While adding interval/instant for statement =" + statement, e); } } @Override - public void deleteStatement(RyaStatement statement) throws IllegalArgumentException, IOException { + public void deleteStatement(final RyaStatement statement) throws IllegalArgumentException, IOException { deleteStatement(RyaToRdfConversions.convertStatement(statement)); } - @Override - public void init() { - // TODO Auto-generated method stub - - } - - @Override - public void setConnector(Connector connector) { - // TODO Auto-generated method stub - - } - - @Override - public void destroy() { - // TODO Auto-generated method stub - - } - - @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - // TODO Auto-generated method stub - - } - - @Override - public void dropAndDestroy() { - // TODO Auto-generated method stub - - } + @Override + public void init() { + // TODO Auto-generated method stub + + } + + @Override + public void setConnector(final Connector connector) { + // TODO Auto-generated method stub + + } + + @Override + public void destroy() { + // TODO Auto-generated method stub + + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + // TODO Auto-generated method stub + + } + + @Override + public void dropAndDestroy() { + // TODO Auto-generated method stub + + } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java deleted file mode 100644 index a69a79f..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * - */ -package mvm.rya.indexing.accumulo.temporal; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; - -import org.apache.commons.codec.binary.StringUtils; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -/** - * Immutable date and time instance returning a human readable key. - * Preserves the Time zone, but not stored in the key. - * Converts fields (hours, etc) correctly for tz=Zulu when stored, - * so the original timezone is not preserved when retrieved. - * - * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset - * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt - * - * Limits: All dates and times are assumed to be in the "current era", no BC, - * somewhere between 0000AD and 9999AD. - * - * Resolution: to the second, or millisecond if the optional fraction is used. - * - * This is really a wrapper for Joda DateTime. if you need functionality from - * that wonderful class, simply use t.getAsDateTime(). - * - */ -public class TemporalInstantRfc3339 implements TemporalInstant { - - private static final long serialVersionUID = -7790000399142290309L; - - private final DateTime dateTime; - /** - * Format key like this: YYYY-MM-DDThh:mm:ssZ - */ - public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); - - /** - * New date assumed UTC time zone. - * - * @param year - * @param month - * @param day - * @param hour - * @param minute - * @param second - */ - public TemporalInstantRfc3339(int year, int month, int day, int hour, int minute, int second) { - dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC); - } - - /** - * Construct with a Joda/java v8 DateTime; - * TZ is preserved, but not in the key. - * - * @param dateTime - * initialize with this date time. Converted to zulu time zone for key generation. - * @return - */ - public TemporalInstantRfc3339(DateTime datetime) { - this.dateTime = datetime; - } - /** - * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}. - * beginning must be less than end. - * - * @param dateTimeInterval String in the form [dateTime1,dateTime2] - */ - public static TemporalInterval parseInterval(String dateTimeInterval) { - - Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(dateTimeInterval); - if (matcher.find()) { - // Got a date time pair, parse into an interval. - return new TemporalInterval( - new TemporalInstantRfc3339(new DateTime(matcher.group(1))), - new TemporalInstantRfc3339(new DateTime(matcher.group(2)))); - } - throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval); - } - - /** - * if this is older returns -1, equal 0, else 1 - * - */ - @Override - public int compareTo(TemporalInstant that) { - return this.getAsKeyString().compareTo(that.getAsKeyString()); - } - - @Override - public byte[] getAsKeyBytes() { - return StringUtils.getBytesUtf8(getAsKeyString()); - } - - @Override - public String getAsKeyString() { - return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER); - } - - /** - * Readable string, formated local time at {@link DateTimeZone}. - * If the timezone is UTC (Z), it was probably a key from the database. - * If the server and client are in different Time zone, should probably use the client timezone. - * - * Time at specified time zone: - * instant.getAsReadable(DateTimeZone.forID("-05:00"))); - * instant.getAsReadable(DateTimeZone.getDefault())); - * - * Use original time zone set in the constructor: - * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER)); - * - */ - @Override - public String getAsReadable(DateTimeZone dateTimeZone) { - return dateTime.withZone(dateTimeZone).toString(FORMATTER); - } - - /** - * Use original time zone set in the constructor, or UTC if from parsing the key. - */ - @Override - public String getAsReadable() { - return dateTime.toString(FORMATTER); - } - - /** - * default toString, same as getAsReadable(). - */ - @Override - public String toString() { - return getAsReadable(); - } - - /** - * Show readable time converted to the default timezone. - */ - @Override - public DateTime getAsDateTime() { - return dateTime; - } - - /** - * Minimum Date, used for infinitely past. - */ - private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE)); - /** - * maximum date/time is used for infinitely in the future. - */ - private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE)); - - /** - * infinite past date. - * @return an instant that will compare as NEWER than anything but itself. - */ - public static TemporalInstant getMinimumInstance() { - return MINIMUM; - } - /** - * infinite future date. - * @return an instant that will compare as OLDER than anything but itself - */ - - public static TemporalInstant getMaximumInstance() { - return MAXIMUM; - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return this.getAsKeyString().hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj; - return (this.getAsKeyString().equals(other.getAsKeyString())); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java deleted file mode 100644 index f2ed8c4..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java +++ /dev/null @@ -1,320 +0,0 @@ -package mvm.rya.indexing.accumulo.temporal; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Map; -import java.util.Set; - -import mvm.rya.indexing.IndexingExpr; -import mvm.rya.indexing.IteratorFactory; -import mvm.rya.indexing.SearchFunction; -import mvm.rya.indexing.SearchFunctionFactory; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; -import mvm.rya.indexing.accumulo.geo.GeoTupleSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.apache.hadoop.conf.Configuration; -import org.joda.time.DateTime; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.QueryModelVisitor; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; - -//Indexing Node for temporal expressions to be inserted into execution plan -//to delegate temporal portion of query to temporal index -public class TemporalTupleSet extends ExternalTupleSet { - - private Configuration conf; - private TemporalIndexer temporalIndexer; - private IndexingExpr filterInfo; - - - public TemporalTupleSet(IndexingExpr filterInfo, TemporalIndexer temporalIndexer) { - this.filterInfo = filterInfo; - this.temporalIndexer = temporalIndexer; - this.conf = temporalIndexer.getConf(); - } - - /** - * {@inheritDoc} - */ - @Override - public Set<String> getBindingNames() { - return filterInfo.getBindingNames(); - } - - /** - * {@inheritDoc} - * <p> - * Note that we need a deep copy for everything that (during optimizations) - * can be altered via {@link #visitChildren(QueryModelVisitor)} - */ - public TemporalTupleSet clone() { - return new TemporalTupleSet(filterInfo, temporalIndexer); - } - - @Override - public double cardinality() { - return 0.0; // No idea how the estimate cardinality here. - } - - - @Override - public String getSignature() { - - return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if (!(other instanceof TemporalTupleSet)) { - return false; - } - TemporalTupleSet arg = (TemporalTupleSet) other; - return this.filterInfo.equals(arg.filterInfo); - } - - - @Override - public int hashCode() { - int result = 17; - result = 31*result + filterInfo.hashCode(); - - return result; - } - - - /** - * Returns an iterator over the result set associated with contained IndexingExpr. - * <p> - * Should be thread-safe (concurrent invocation {@link OfflineIterable} this - * method can be expected with some query evaluators. - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) - throws QueryEvaluationException { - - - URI funcURI = filterInfo.getFunction(); - SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); - - if(filterInfo.getArguments().length > 1) { - throw new IllegalArgumentException("Index functions do not support more than two arguments."); - } - - String queryText = filterInfo.getArguments()[0].stringValue(); - - return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); - } - - - //returns appropriate search function for a given URI - //search functions used by TemporalIndexer to query Temporal Index - private class TemporalSearchFunctionFactory { - - private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - Configuration conf; - - public TemporalSearchFunctionFactory(Configuration conf) { - this.conf = conf; - } - - - /** - * Get a {@link TemporalSearchFunction} for a give URI. - * - * @param searchFunction - * @return - */ - public SearchFunction getSearchFunction(final URI searchFunction) { - - SearchFunction geoFunc = null; - - try { - geoFunc = getSearchFunctionInternal(searchFunction); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } - - return geoFunc; - } - - private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); - - if (sf != null) { - return sf; - } else { - throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); - } - - - } - - - - private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInstant"; - }; - }; - private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantEqualsInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantInsideInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasBeginningInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasEndInterval"; - }; - }; - - { - - String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant); - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"), - TEMPORAL_InstantHasBeginningInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval); - - } - - - - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java index cce0a81..6aaf2c4 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; @@ -34,6 +36,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException; +import org.openrdf.model.URI; import com.google.common.base.Optional; import com.google.common.base.Supplier; @@ -114,7 +117,7 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { @Override public Configuration getConf() { - return this.conf.get(); + return conf.get(); } /** @@ -252,4 +255,9 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented."); return null; } + + @Override + public Set<URI> getIndexablePredicates() { + return new HashSet<>(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java index 8d7b180..ed202f4 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java @@ -20,6 +20,12 @@ package mvm.rya.indexing.external.fluo; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_INSTANCE; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_PASSWORD; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERNAME; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME; +import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY; import javax.annotation.ParametersAreNonnullByDefault; @@ -67,13 +73,13 @@ public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> { final FluoPcjUpdaterConfig fluoUpdaterConfig = new FluoPcjUpdaterConfig( indexerConfig.getConfig() ); // Make sure the required values are present. - checkArgument(fluoUpdaterConfig.getFluoAppName().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.FLUO_APP_NAME); - checkArgument(fluoUpdaterConfig.getFluoZookeepers().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS); - checkArgument(fluoUpdaterConfig.getAccumuloZookeepers().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS); - checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_INSTANCE); - checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_USERNAME); - checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_PASSWORD); - checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.STATEMENT_VISIBILITY); + checkArgument(fluoUpdaterConfig.getFluoAppName().isPresent(), "Missing configuration: " + FLUO_APP_NAME); + checkArgument(fluoUpdaterConfig.getFluoZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS); + checkArgument(fluoUpdaterConfig.getAccumuloZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS); + checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + ACCUMULO_INSTANCE); + checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + ACCUMULO_USERNAME); + checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + ACCUMULO_PASSWORD); + checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + STATEMENT_VISIBILITY); // Fluo configuration values. final FluoConfiguration fluoClientConfig = new FluoConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java index 5f2e700..078cbd2 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java @@ -1,5 +1,7 @@ package mvm.rya.indexing.mongodb; +import static com.google.common.base.Preconditions.checkNotNull; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +10,9 @@ package mvm.rya.indexing.mongodb; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,49 +21,170 @@ package mvm.rya.indexing.mongodb; * under the License. */ - import java.io.IOException; import java.util.Collection; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.QueryBuilder; +import info.aduna.iteration.CloseableIteration; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaURI; import mvm.rya.api.persist.index.RyaSecondaryIndexer; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.mongodb.MongoDBRdfConfiguration; -import org.apache.hadoop.conf.Configuration; +/** + * Secondary Indexer using MondoDB + * @param <T> - The {@link AbstractMongoIndexingStorageStrategy} this indexer uses. + */ +public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrategy> implements RyaSecondaryIndexer { + private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class); + + private boolean isInit = false; + protected Configuration conf; + protected MongoClient mongoClient; + protected String dbName; + protected DB db; + protected DBCollection collection; + protected Set<URI> predicates; + + protected T storageStrategy; -public abstract class AbstractMongoIndexer implements RyaSecondaryIndexer { + /** + * Creates a new {@link AbstractMongoIndexer} with the provided mongo client. + * @param mongoClient The {@link MongoClient} to use with this indexer. + */ + public AbstractMongoIndexer(final MongoClient mongoClient) { + this.mongoClient = checkNotNull(mongoClient); + } + + protected void init() throws NumberFormatException, IOException{ + dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME); + db = this.mongoClient.getDB(dbName); + collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName()); + } + + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + init(); + isInit = true; + } catch (final NumberFormatException | IOException e) { + LOG.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } @Override public void close() throws IOException { + mongoClient.close(); } @Override public void flush() throws IOException { } - @Override public Configuration getConf() { - return null; + return conf; } - + @Override public String getTableName() { - return null; + return dbName; } @Override - public void storeStatements(Collection<RyaStatement> ryaStatements) + public Set<URI> getIndexablePredicates() { + return predicates; + } + + @Override + public void deleteStatement(final RyaStatement stmt) throws IOException { + final DBObject obj = storageStrategy.getQuery(stmt); + collection.remove(obj); + } + + @Override + public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { - for (RyaStatement ryaStatement : ryaStatements){ + for (final RyaStatement ryaStatement : ryaStatements){ storeStatement(ryaStatement); } - } @Override - public void dropGraph(RyaURI... graphs) { + public void storeStatement(final RyaStatement ryaStatement) throws IOException { + try { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate()); + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + final DBObject obj = storageStrategy.serialize(ryaStatement); + if (obj != null) { + final DBObject query = storageStrategy.serialize(ryaStatement); + collection.update(query, obj, true, false); + } + } + } catch (final IllegalArgumentException e) { + LOG.error("Unable to parse the statement: " + ryaStatement.toString()); + } + } + + @Override + public void dropGraph(final RyaURI... graphs) { throw new UnsupportedOperationException(); } + protected CloseableIteration<Statement, QueryEvaluationException> withConstraints(final StatementConstraints constraints, final DBObject preConstraints) { + final DBObject dbo = QueryBuilder.start().and(preConstraints).and(storageStrategy.getQuery(constraints)).get(); + return closableIterationFromCursor(dbo); + } + + private CloseableIteration<Statement, QueryEvaluationException> closableIterationFromCursor(final DBObject dbo) { + final DBCursor cursor = collection.find(dbo); + return new CloseableIteration<Statement, QueryEvaluationException>() { + @Override + public boolean hasNext() { + return cursor.hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + final DBObject dbo = cursor.next(); + return RyaToRdfConversions.convertStatement(storageStrategy.deserializeDBObject(dbo)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + cursor.close(); + } + }; + } + + /** + * @return The name of the {@link DBCollection} to use with the storage strategy. + */ + public abstract String getCollectionName(); }
