http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java new file mode 100644 index 0000000..097c52c --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/ShardSubjectLookupStatementIterator.java @@ -0,0 +1,493 @@ +package mvm.mmrts.rdf.partition.query.evaluation; + +import cloudbase.core.client.BatchScanner; +import cloudbase.core.client.Connector; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.security.Authorizations; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import info.aduna.iteration.CloseableIteration; +import info.aduna.iteration.EmptyIteration; +import mvm.mmrts.rdf.partition.PartitionSail; +import mvm.mmrts.rdf.partition.query.evaluation.select.FilterIterator; +import mvm.mmrts.rdf.partition.query.evaluation.select.SelectAllIterator; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; +import ss.cloudbase.core.iterators.CellLevelRecordIterator; +import ss.cloudbase.core.iterators.GMDenIntersectingIterator; +import ss.cloudbase.core.iterators.SortedRangeIterator; +import ss.cloudbase.core.iterators.filter.CBConverter; + +import java.io.IOException; +import java.util.*; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +/** + * Class ShardSubjectLookupStatementIterator + * Date: Jul 18, 2011 + * Time: 10:53:55 AM + */ +public class ShardSubjectLookupStatementIterator implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + private Connector connector; + private String table; + //MMRTS-148 + private String shardTable; + private ShardSubjectLookup lookup; + private DateHashModShardValueGenerator generator; + private BatchScanner scanner; + private BindingSet bindings; + private CloseableIteration<BindingSet, QueryEvaluationException> iter; + private Configuration configuration; +// private TimeType timeType = TimeType.XMLDATETIME; + private Authorizations authorizations = ALL_AUTHORIZATIONS; + + private int numThreads; + + public ShardSubjectLookupStatementIterator(PartitionSail psail, ShardSubjectLookup lookup, BindingSet bindings, Configuration configuration) throws QueryEvaluationException { + this.connector = psail.getConnector(); + this.lookup = lookup; + this.table = psail.getTable(); + this.shardTable = psail.getShardTable(); + this.bindings = bindings; + this.configuration = configuration; + + //Time Type check +// timeType = TimeType.valueOf(this.configuration.get(TIME_TYPE_PROP, TimeType.XMLDATETIME.name())); + + //authorizations + String auths = this.configuration.get(AUTHORIZATION_PROP); + if (auths != null) { + authorizations = new Authorizations(auths.split(",")); + } + + //TODO: for now we need this + this.generator = (DateHashModShardValueGenerator) psail.getGenerator(); + + this.numThreads = this.configuration.getInt(NUMTHREADS_PROP, generator.getBaseMod()); + + this.initialize(); + } + + public void initialize() throws QueryEvaluationException { + try { + /** + * Here we will set up the BatchScanner based on the lookup + */ + Var subject = lookup.getSubject(); + List<Map.Entry<Var, Var>> where = retrieveWhereClause(); + List<Map.Entry<Var, Var>> select = retrieveSelectClause(); + + //global start-end time + long start = configuration.getLong(START_BINDING, 0); + long end = configuration.getLong(END_BINDING, System.currentTimeMillis()); + + int whereSize = where.size() + select.size() + ((!isTimeRange(lookup, configuration)) ? 0 : 1); + + if (subject.hasValue() + && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */ + && select.size() == 0) { + /** + * Case 1: Subject is set, but predicate, object are not. + * Return all for the subject + */ + this.scanner = scannerForSubject(subject.getValue()); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + Map.Entry<Var, Var> predObj = lookup.getPredicateObjectPairs().get(0); + this.iter = new SelectAllIterator(this.bindings, this.scanner.iterator(), predObj.getKey(), predObj.getValue()); + } else if (subject.hasValue() + && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { + /** + * Case 2: Subject is set, and a few predicates are set, but no objects + * Return all, and filter which predicates you are interested in + */ + this.scanner = scannerForSubject(subject.getValue()); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + } else if (subject.hasValue() + && where.size() >= 1 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { + /** + * Case 2a: Subject is set, and a few predicates are set, and one object + * TODO: For now we will ignore the predicate-object filter because we do not know how to query for this + */ + this.scanner = scannerForSubject(subject.getValue()); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + } else if (!subject.hasValue() && whereSize > 1) { + /** + * Case 3: Subject is not set, more than one where clause + */ + this.scanner = scannerForPredicateObject(lookup, start, end, where, select); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); +// this.iter = new SubjectSelectIterator(this.bindings, this.scanner.iterator(), subject, select); + } else if (!subject.hasValue() && whereSize == 1 && select.size() == 0) { + /** + * Case 4: No subject, only one where clause + */ + Map.Entry<Var, Var> predObj = null; + if (where.size() == 1) { + predObj = where.get(0); + } + this.scanner = scannerForPredicateObject(lookup, start, end, predObj); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); +// this.iter = new SubjectSelectIterator(this.bindings, this.scanner.iterator(), subject, select); + } else if (!subject.hasValue() && select.size() > 1) { + + /** + * Case 5: No subject, no where (multiple select) + */ + this.scanner = scannerForPredicates(start, end, select); + if (this.scanner == null) { + this.iter = new EmptyIteration(); + return; + } + this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + } else if (!subject.hasValue() && select.size() == 1) { + /** + * Case 5: No subject, no where (just 1 select) + */ + cloudbase.core.client.Scanner sc = scannerForPredicate(lookup, start, end, (URI) select.get(0).getKey().getValue()); + if (sc == null) { + this.iter = new EmptyIteration(); + return; + } //TODO: Fix, put in concrete class + final Iterator<Map.Entry<Key, Value>> scIter = sc.iterator(); + this.iter = new FilterIterator(this.bindings, scIter, subject, select); + } else { + throw new QueryEvaluationException("Case not supported as of yet"); + } + + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + + protected List<Map.Entry<Var, Var>> retrieveWhereClause() { + List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>(); + for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { + Var pred = entry.getKey(); + Var object = entry.getValue(); + if (pred.hasValue() && object.hasValue()) { + where.add(entry); //TODO: maybe we should clone this? + } + } + return where; + } + + protected List<Map.Entry<Var, Var>> retrieveSelectClause() { + List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, Var>>(); + for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { + Var pred = entry.getKey(); + Var object = entry.getValue(); + if (pred.hasValue() && !object.hasValue()) { + select.add(entry); //TODO: maybe we should clone this? + } + } + return select; + } + + @Override + public void close() throws QueryEvaluationException { + if (this.scanner != null) { + this.scanner.close(); + } + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return iter.hasNext(); + } + + @Override + public BindingSet next() throws QueryEvaluationException { + try { + return iter.next(); + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public void remove() throws QueryEvaluationException { + iter.next(); + } + + /** + * Utility methods to set up the scanner/batch scanner + */ + + protected List<Text> shardForSubject(org.openrdf.model.Value subject) throws TableNotFoundException, IOException { + BatchScanner scanner = createBatchScanner(this.shardTable); + try { + scanner.setRanges(Collections.singleton( + new Range(new Text(writeValue(subject))) + )); + Iterator<Map.Entry<Key, Value>> shardIter = scanner.iterator(); + if (!shardIter.hasNext()) { + return null; + } + + List<Text> shards = new ArrayList<Text>(); + while (shardIter.hasNext()) { + shards.add(shardIter.next().getKey().getColumnFamily()); + } + //MMRTS-147 so that we can return subjects from multiple shards + return shards; + } finally { + if (scanner != null) + scanner.close(); + } + } + + + protected BatchScanner scannerForSubject(org.openrdf.model.Value subject) throws TableNotFoundException, IOException { + List<Text> shards = shardForSubject(subject); + + if (shards == null) + return null; + + BatchScanner scanner = createBatchScanner(this.table); + +// scanner.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci"); + Collection<Range> ranges = new ArrayList<Range>(); + for (Text shard : shards) { + ranges.add(new Range( + new Key( + shard, DOC, + new Text(URI_MARKER_STR + subject + FAMILY_DELIM_STR + "\0") + ), + new Key( + shard, DOC, + new Text(URI_MARKER_STR + subject + FAMILY_DELIM_STR + "\uFFFD") + ) + )); + } + scanner.setRanges(ranges); + return scanner; + } + + protected BatchScanner scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, List<Map.Entry<Var, Var>> predObjs, List<Map.Entry<Var, Var>> select) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + int extra = 0; + + if (isTimeRange(lookup, configuration)) { + extra += 1; + } + + Text[] queries = new Text[predObjs.size() + select.size() + extra]; + int qi = 0; + for (Map.Entry<Var, Var> predObj : predObjs) { + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + writeValue(output, predObj.getKey().getValue()); + output.write(INDEX_DELIM); + writeValue(output, predObj.getValue().getValue()); + queries[qi++] = new Text(output.toByteArray()); + } + for (Map.Entry<Var, Var> predicate : select) { + queries[qi++] = new Text(GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), + URI_MARKER_STR + predicate.getKey().getValue() + INDEX_DELIM_STR + "\0" + , true, + URI_MARKER_STR + predicate.getKey().getValue() + INDEX_DELIM_STR + "\uFFFD", + true + )); + } + + if (isTimeRange(lookup, configuration)) { + queries[queries.length - 1] = new Text( + GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), + getStartTimeRange(lookup, configuration) + , true, + getEndTimeRange(lookup, configuration), + true + ) + ); + } + + BatchScanner bs = createBatchScanner(this.table); + + bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci"); + bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, VALUE_DELIMITER); + + bs.setScanIterators(20, GMDenIntersectingIterator.class.getName(), "ii"); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.docFamilyOptionName, DOC.toString()); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString()); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(queries)); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + bs.setRanges(Collections.singleton( + range + )); + + return bs; + } + + protected BatchScanner scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, Map.Entry<Var, Var> predObj) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + BatchScanner bs = createBatchScanner(this.table); + + bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci"); + bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, VALUE_DELIMITER); + + bs.setScanIterators(20, SortedRangeIterator.class.getName(), "ri"); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_DOC_COLF, DOC.toString()); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_COLF, INDEX.toString()); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_MULTI_DOC, "" + true); + + if (isTimeRange(lookup, configuration)) { + String startRange = getStartTimeRange(lookup, configuration); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_LOWER_BOUND, + startRange); + String endRange = getEndTimeRange(lookup, configuration); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_UPPER_BOUND, + endRange); + } else { + + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + writeValue(output, predObj.getKey().getValue()); + output.write(INDEX_DELIM); + writeValue(output, predObj.getValue().getValue()); + + String bound = new String(output.toByteArray()); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_LOWER_BOUND, bound); + bs.setScanIteratorOption("ri", SortedRangeIterator.OPTION_UPPER_BOUND, bound + "\00"); + } + + //TODO: Do we add a time predicate to this? +// bs.setScanIterators(19, FilteringIterator.class.getName(), "filteringIterator"); +// bs.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); +// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, (end - start) + ""); +// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, end + ""); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + bs.setRanges(Collections.singleton( + range + )); + + return bs; + } + + protected BatchScanner scannerForPredicates(Long start, Long end, List<Map.Entry<Var, Var>> predicates) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + int extra = 0; + + if (isTimeRange(lookup, configuration)) { + extra += 1; + } + + Text[] queries = new Text[predicates.size() + extra]; + for (int i = 0; i < predicates.size(); i++) { + Map.Entry<Var, Var> predicate = predicates.get(i); + queries[i] = new Text(GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), + URI_MARKER_STR + predicate.getKey().getValue() + INDEX_DELIM_STR + "\0" + , true, + URI_MARKER_STR + predicate.getKey().getValue() + INDEX_DELIM_STR + "\uFFFD", + true + )); + } + + if (isTimeRange(lookup, configuration)) { + queries[queries.length - 1] = new Text( + GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), + getStartTimeRange(lookup, configuration) + , true, + getEndTimeRange(lookup, configuration), + true + ) + ); + } + + BatchScanner bs = createBatchScanner(this.table); + bs.setScanIterators(21, CellLevelRecordIterator.class.getName(), "ci"); + bs.setScanIteratorOption("ci", CBConverter.OPTION_VALUE_DELIMITER, VALUE_DELIMITER); + + bs.setScanIterators(20, GMDenIntersectingIterator.class.getName(), "ii"); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.docFamilyOptionName, DOC.toString()); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString()); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(queries)); + bs.setScanIteratorOption("ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + bs.setRanges(Collections.singleton( + range + )); + + return bs; + } + + protected cloudbase.core.client.Scanner scannerForPredicate(ShardSubjectLookup lookup, Long start, Long end, URI predicate) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + cloudbase.core.client.Scanner sc = createScanner(this.table); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + sc.setRange(range); + sc.fetchColumnFamily(INDEX); + sc.setColumnFamilyRegex(INDEX.toString()); + sc.setColumnQualifierRegex(URI_MARKER_STR + predicate + INDEX_DELIM_STR + "(.*)"); + + return sc; + } + + protected cloudbase.core.client.Scanner createScanner(String sTable) throws TableNotFoundException { + return connector.createScanner(sTable, authorizations); + } + + protected BatchScanner createBatchScanner(String sTable) throws TableNotFoundException { + return createBatchScanner(sTable, numThreads); + } + + protected BatchScanner createBatchScanner(String sTable, int numThreads) throws TableNotFoundException { + return connector.createBatchScanner(sTable, authorizations, numThreads); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java new file mode 100644 index 0000000..782cfb9 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/SubjectGroupingOptimizer.java @@ -0,0 +1,178 @@ +package mvm.mmrts.rdf.partition.query.evaluation; + +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import mvm.mmrts.rdf.partition.utils.CountPredObjPairs; +import org.apache.hadoop.conf.Configuration; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; + +/** + * Date: Jul 14, 2011 + * Time: 4:14:16 PM + */ +public class SubjectGroupingOptimizer implements QueryOptimizer { + + private static final Comparator<Var> VAR_COMPARATOR = new VarComparator(); + private static final Comparator<StatementPattern> SP_SUBJ_COMPARATOR = new SubjectComparator(); + private static final Comparator<TupleExpr> STATS_SHARD_COMPARATOR = new ShardLookupComparator(); + private static final CountPredObjPairs STATISTICS = new CountPredObjPairs(); + private Configuration conf; + + public SubjectGroupingOptimizer(Configuration conf) { + this.conf = conf; + } + + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindingSet) { + tupleExpr.visit(new FlattenJoinVisitor()); + } + + protected class FlattenJoinVisitor extends QueryModelVisitorBase<RuntimeException> { + @Override + public void meet(Join node) throws RuntimeException { + List<StatementPattern> flatten = getJoinArgs(node, new ArrayList<StatementPattern>()); + //order by subject + Collections.sort(flatten, SP_SUBJ_COMPARATOR); + + List<TupleExpr> shardLookups = new ArrayList<TupleExpr>(); + Var current = null; + ShardSubjectLookup shardLookupCurrent = null; + for (StatementPattern sp : flatten) { + if (!sp.getSubjectVar().hasValue() && !sp.getPredicateVar().hasValue()) { + // if there is nothing set in the subject or predicate, we treat it as a single item + // might be ?s ?p ?o + shardLookups.add(sp); + } else { + Var subjectVar = sp.getSubjectVar(); + if (VAR_COMPARATOR.compare(current, subjectVar) != 0) { + current = subjectVar; + shardLookupCurrent = new ShardSubjectLookup(current); + populateLookup(shardLookupCurrent); + shardLookups.add(shardLookupCurrent); + } + shardLookupCurrent.addPredicateObjectPair(sp.getPredicateVar(), sp.getObjectVar()); + } + } + + int i = 0; + Collections.sort(shardLookups, STATS_SHARD_COMPARATOR); + TupleExpr replacement = shardLookups.get(i); + for (i++; i < shardLookups.size(); i++) { + replacement = new Join(replacement, shardLookups.get(i)); + } + + node.replaceWith(replacement); + } + + @Override + public void meet(StatementPattern node) throws RuntimeException { + ShardSubjectLookup lookup = new ShardSubjectLookup(node.getSubjectVar()); + lookup.addPredicateObjectPair(node.getPredicateVar(), node.getObjectVar()); + populateLookup(lookup); + node.replaceWith(lookup); + } + } + + protected <L extends List<StatementPattern>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) { + if (tupleExpr instanceof Join) { + Join join = (Join) tupleExpr; + getJoinArgs(join.getLeftArg(), joinArgs); + getJoinArgs(join.getRightArg(), joinArgs); + } else if (tupleExpr instanceof StatementPattern) { + joinArgs.add((StatementPattern) tupleExpr); + } + + return joinArgs; + } + + protected ShardSubjectLookup populateLookup(ShardSubjectLookup lookup) { + String timePredicate = conf.get(lookup.getSubject().getName() + "." + TIME_PREDICATE); + if (timePredicate != null) { + lookup.setTimePredicate(timePredicate); + lookup.setStartTimeRange(conf.get(lookup.getSubject().getName() + "." + START_BINDING)); + lookup.setEndTimeRange(conf.get(lookup.getSubject().getName() + "." + END_BINDING)); + lookup.setTimeType(TimeType.valueOf(conf.get(lookup.getSubject().getName() + "." + TIME_TYPE_PROP, TimeType.XMLDATETIME.name()))); + } + + String shardRange = conf.get(lookup.getSubject().getName() + "." + SHARDRANGE_BINDING); + if(shardRange != null) { + lookup.setShardStartTimeRange(conf.get(lookup.getSubject().getName() + "." + SHARDRANGE_START)); + lookup.setShardEndTimeRange(conf.get(lookup.getSubject().getName() + "." + SHARDRANGE_END)); + } + + return lookup; + } + + protected static class SubjectComparator implements Comparator<StatementPattern> { + + @Override + public int compare(StatementPattern a, StatementPattern b) { + if (a == b) + return 0; + + if (a == null || b == null) + return 1; + + if (a.getSubjectVar().equals(b.getSubjectVar())) { + if (a.getPredicateVar().hasValue() && b.getPredicateVar().hasValue()) + return 0; + if (a.getPredicateVar().hasValue() && !b.getPredicateVar().hasValue()) + return -1; + if (!a.getPredicateVar().hasValue() && b.getPredicateVar().hasValue()) + return 1; + return 0; + } + + if (a.getSubjectVar().getValue() != null && b.getSubjectVar().getValue() != null && + a.getSubjectVar().getValue().equals(b.getSubjectVar().getValue())) + return 0; + + return 1; + } + } + + protected static class ShardLookupComparator implements Comparator<TupleExpr> { + + @Override + public int compare(TupleExpr a, TupleExpr b) { + double a_c = STATISTICS.getCount(a); + double b_c = STATISTICS.getCount(b); + double diff = a_c - b_c; + return (int) (diff / Math.abs(diff)); + } + } + + protected static class VarComparator implements Comparator<Var> { + + @Override + public int compare(Var a, Var b) { + if (a == b) + return 0; + if (a == null || b == null) + return 1; + + if (a.equals(b)) + return 0; + + if (a.getValue() != null && + b.getValue() != null && + a.getValue().equals(b.getValue())) + return 0; + + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java new file mode 100644 index 0000000..7da4276 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/FilterIterator.java @@ -0,0 +1,100 @@ +package mvm.mmrts.rdf.partition.query.evaluation.select; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.Lists; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import java.util.*; + +/** + * TODO: This could be done as a filtering iterator in the Iterator Stack + */ +public class FilterIterator extends SelectIterator { + + private List<Map.Entry<Var, Var>> predObjs; + private Map<URI, Map.Entry<Var, Var>> filters = new HashMap<URI, Map.Entry<Var, Var>>(); + private List<Statement> document; + private List<Map.Entry<Var, Var>> currentPredObj; + private Var subjVar; + private List<QueryBindingSet> currentResults; + private int currentResultsIndex = 0; + + public FilterIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> iter, Var subjVar, List<Map.Entry<Var, Var>> predObjs) throws QueryEvaluationException { + super(bindings, iter); + this.subjVar = subjVar; + this.predObjs = predObjs; + for (Map.Entry<Var, Var> predObj : this.predObjs) { + //find filtering predicates + this.filters.put((URI) predObj.getKey().getValue(), predObj); + } + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (document != null || currentResults != null) + return true; + + return super.hasNext(); + +// boolean hasNext = super.hasNext(); +// List<Map.Entry<Var, Var>> filter = null; +// while (hasNext) { +// List<Statement> stmts = nextDocument(); +// filter = filter(stmts); +// if (filter != null && filter.size() > 0) { +// document = stmts; +// this.currentPredObj = filter; +// return true; +// } +// hasNext = super.hasNext(); +// } +// return document != null; + } + + @Override + public BindingSet next() throws QueryEvaluationException { + try { + if (document == null) { + document = nextDocument(); + } + if (currentResults == null) { + currentResults = populateBindingSet(document, subjVar, this.predObjs); + } + BindingSet bs = currentResults.get(currentResultsIndex); + currentResultsIndex++; + if (currentResultsIndex >= currentResults.size()) { + currentResults = null; + currentResultsIndex = 0; + document = null; + } + return bs; + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + + /** + * @return true if the Statement is filtered + * @throws QueryEvaluationException + */ + protected List<Map.Entry<Var, Var>> filter(List<Statement> document) throws QueryEvaluationException { + List<Map.Entry<Var, Var>> foundIn = new ArrayList(); + + for (Statement st : document) { + for (Map.Entry<Var, Var> entry : this.predObjs) { + if (st.getPredicate().equals(entry.getKey().getValue())) { + foundIn.add(entry); + break; + } + } + } + return foundIn; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java new file mode 100644 index 0000000..ebe23dc --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectAllIterator.java @@ -0,0 +1,54 @@ +package mvm.mmrts.rdf.partition.query.evaluation.select; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.Lists; +import org.openrdf.model.Statement; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Class SelectAllIterator + * Date: Jul 18, 2011 + * Time: 12:01:25 PM + */ +public class SelectAllIterator extends SelectIterator { + + private List<Map.Entry<Var, Var>> predObj; + private List<Statement> document = null; + private int index = 0; + + public SelectAllIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> iter, Var predVar, Var objVar) throws QueryEvaluationException { + super(bindings, iter); + predObj = (List) Lists.newArrayList(new HashMap.SimpleEntry(predVar, objVar)); + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return super.hasNext() || document != null; + } + + @Override + public BindingSet next() throws QueryEvaluationException { + try { + if (document == null && super.hasNext()) { + document = nextDocument(); + } + Statement st = document.get(index); + index++; + if (index >= document.size()) { + document = null; + } + return populateBindingSet(st, predObj); + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java new file mode 100644 index 0000000..e6efa2b --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SelectIterator.java @@ -0,0 +1,270 @@ +package mvm.mmrts.rdf.partition.query.evaluation.select; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.PeekingIterator; +import com.google.common.io.ByteStreams; +import info.aduna.iteration.CloseableIteration; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import ss.cloudbase.core.iterators.filter.CBConverter; + +import java.util.*; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.utils.RdfIO.readStatement; + +/** + * Class SelectAllIterator + * Date: Jul 18, 2011 + * Time: 12:01:25 PM + */ +public abstract class SelectIterator implements CloseableIteration<BindingSet, QueryEvaluationException> { + + protected PeekingIterator<Map.Entry<Key, Value>> iter; + protected BindingSet bindings; + protected CBConverter converter = new CBConverter(); + + private boolean hasNext = true; + + public SelectIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> iter) { + this.bindings = bindings; + this.iter = Iterators.peekingIterator(iter); + converter.init(Collections.singletonMap(CBConverter.OPTION_VALUE_DELIMITER, VALUE_DELIMITER)); + } + + @Override + public void close() throws QueryEvaluationException { + + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return statefulHasNext(); + } + + protected boolean statefulHasNext() { + hasNext = iter.hasNext() && hasNext; + return hasNext; + } + + protected List<Statement> nextDocument() throws QueryEvaluationException { + try { + Map.Entry<Key, Value> entry = iter.peek(); + Key key = entry.getKey(); + Value value = entry.getValue(); + + if (value.getSize() == 0) { + //not an aggregate document + return nextNonAggregateDocument(); +// return Collections.singletonList(RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, true)); + } + + List<Statement> document = new ArrayList<Statement>(); + + org.openrdf.model.Value subj = RdfIO.readValue(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, FAMILY_DELIM); + Map<String, String> map = converter.toMap(entry.getKey(), value); + for (Map.Entry<String, String> e : map.entrySet()) { + String predObj = e.getKey(); + String[] split = predObj.split(FAMILY_DELIM_STR); + document.add(new StatementImpl((Resource) subj, VALUE_FACTORY.createURI(split[0]), RdfIO.readValue(ByteStreams.newDataInput(split[1].getBytes()), VALUE_FACTORY, FAMILY_DELIM))); + } + iter.next(); + return document; + } catch (Exception e) { + throw new QueryEvaluationException("Error retrieving document", e); + } + } + +// protected List<Statement> nextDocument() throws QueryEvaluationException { +// try { +// List<? extends Map.Entry<Key, Value>> entryList = iter.next(); +// List<Statement> document = new ArrayList(); +// for (Map.Entry<Key, Value> keyValueEntry : entryList) { +// Statement stmt = null; +// Key key = keyValueEntry.getKey(); +// if (DOC.equals(key.getColumnFamily())) +// stmt = readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY); +// else +// stmt = readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, false); +// document.add(stmt); +// } +// return document; +// } catch (Exception e) { +// throw new QueryEvaluationException(e); +// } +// } + +// protected List<Statement> nextDocument() throws QueryEvaluationException { +// return documentIter.next(); +// } + + protected List<Statement> nextNonAggregateDocument() throws QueryEvaluationException { + try { + List<Statement> document = new ArrayList<Statement>(); + if (!statefulHasNext()) + return document; + Statement stmt = peekNextStatement(); + if (stmt == null) + return document; + + Resource subject = stmt.getSubject(); + Resource current = subject; + document.add(stmt); + while ((current.equals(subject) && statefulHasNext())) { + advance(); + current = subject; + stmt = peekNextStatement(); + if (stmt != null) { + subject = stmt.getSubject(); + if (subject.equals(current)) + document.add(stmt); + } else + subject = null; + } +// System.out.println(document); + return document; + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + +// protected Statement nextStatement() throws Exception { +// List<Map.Entry<Key, Value>> entryList = iter.next(); +// for (Map.Entry<Key, Value> keyValueEntry : entryList) { +// +// } +// Map.Entry<Key, Value> entry = iter.next(); +// Key key = entry.getKey(); +// if (DOC.equals(key.getColumnFamily())) +// return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY); +// else +// return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, false); +// } + + protected Statement peekNextStatement() throws Exception { + if (!statefulHasNext()) + return null; + Map.Entry<Key, Value> entry = iter.peek(); + Key key = entry.getKey(); + if (DOC.equals(key.getColumnFamily())) + return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY); + else + return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, false); + } + + protected void advance() throws Exception { + iter.next(); + } + + @Override + public void remove() throws QueryEvaluationException { + iter.next(); + } + + protected BindingSet populateBindingSet(Statement st, List<Map.Entry<Var, Var>> predObjVar) { + QueryBindingSet result = new QueryBindingSet(bindings); + for (Map.Entry<Var, Var> entry : predObjVar) { + Var predVar = entry.getKey(); + Var objVar = entry.getValue(); + if (predVar != null && !result.hasBinding(predVar.getName())) + result.addBinding(predVar.getName(), st.getPredicate()); + if (objVar != null && !result.hasBinding(objVar.getName())) + result.addBinding(objVar.getName(), st.getObject()); + } + return result; + } + + protected List<QueryBindingSet> populateBindingSet(List<Statement> document, Var subjVar, List<Map.Entry<Var, Var>> predObjVar) { + //convert document to a multimap + Multimap<URI, Statement> docMap = ArrayListMultimap.create(); + for (Statement st : document) { + docMap.put(st.getPredicate(), st); + } + + List<QueryBindingSet> results = new ArrayList<QueryBindingSet>(); + QueryBindingSet bs0 = new QueryBindingSet(bindings); +// QueryBindingSet result = new QueryBindingSet(bindings); + + if (document.size() > 0) { + Statement stmt = document.get(0); + if (subjVar != null && !bs0.hasBinding(subjVar.getName())) { + bs0.addBinding(subjVar.getName(), stmt.getSubject()); + } + } + results.add(bs0); + +// for (Statement st : document) { + for (Map.Entry<Var, Var> entry : predObjVar) { + Var predVar = entry.getKey(); + Var objVar = entry.getValue(); + +// if (predVar.hasValue() && !st.getPredicate().equals(predVar.getValue())) +// continue; + if (predVar == null || !predVar.hasValue()) + continue; + Collection<Statement> predSts = docMap.get((URI) predVar.getValue()); + +// if (predVar != null && !result.hasBinding(predVar.getName())) +// result.addBinding(predVar.getName(), st.getPredicate()); +// if (objVar != null && !result.hasBinding(objVar.getName())) +// result.addBinding(objVar.getName(), st.getObject()); + + populateBindingSets(results, predVar, objVar, predSts); + } +// } + return results; + } + + private void populateBindingSets(List<QueryBindingSet> results, Var predVar, Var objVar, Collection<Statement> stmts) { + if (predVar == null || objVar == null || stmts == null || stmts.size() == 0) + return; + + List<QueryBindingSet> copyOf = new ArrayList<QueryBindingSet>(results); + + int i = copyOf.size(); + int j = 0; + for (Iterator<Statement> iter = stmts.iterator(); iter.hasNext();) { + Statement st = iter.next(); + int k = 0; + for (QueryBindingSet result : results) { + if (!result.hasBinding(predVar.getName()) || k >= i) { + String name = predVar.getName(); + org.openrdf.model.Value val = st.getPredicate(); + addBinding(result, name, val); + } + if (!result.hasBinding(objVar.getName()) || k >= i) + addBinding(result, objVar.getName(), st.getObject()); + k++; + } + + i = copyOf.size() + j * copyOf.size(); + j++; + + if (iter.hasNext()) { + //copy results + for (QueryBindingSet copy : copyOf) { + results.add(new QueryBindingSet(copy)); + } + } + + } + } + + private void addBinding(QueryBindingSet result, String name, org.openrdf.model.Value val) { + if (result.hasBinding(name)) + result.removeBinding(name); + result.addBinding(name, val); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java new file mode 100644 index 0000000..fe0fca2 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/SubjectSelectIterator.java @@ -0,0 +1,40 @@ +//package mvm.mmrts.rdf.partition.query.evaluation.select; +// +//import cloudbase.core.data.Key; +//import cloudbase.core.data.Value; +//import org.openrdf.model.Statement; +//import org.openrdf.query.BindingSet; +//import org.openrdf.query.QueryEvaluationException; +//import org.openrdf.query.algebra.Var; +//import org.openrdf.query.algebra.evaluation.QueryBindingSet; +// +//import java.util.Iterator; +//import java.util.List; +//import java.util.Map; +// +///** +// * Class SubjectSelectIterator +// * Date: Jul 18, 2011 +// * Time: 3:38:16 PM +// */ +//public class SubjectSelectIterator extends SelectIterator { +// +// private Var subjVar; +// private List<Map.Entry<Var, Var>> select; +// +// public SubjectSelectIterator(BindingSet bindings, Iterator<Map.Entry<Key, Value>> iter, Var subjVar, List<Map.Entry<Var, Var>> select) { +// super(bindings, iter); +// this.subjVar = subjVar; +// this.select = select; +// } +// +// @Override +// public BindingSet next() throws QueryEvaluationException { +// List<Statement> document = nextDocument(); +// if(document.size() != 6) { +// System.out.println("here"); +// } +// return populateBindingSet(document, subjVar, this.select); +// +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java new file mode 100644 index 0000000..f1e6c74 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/select/utils/DocumentIterator.java @@ -0,0 +1,107 @@ +package mvm.mmrts.rdf.partition.query.evaluation.select.utils; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.io.ByteStreams; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; + +import java.util.*; + +import static mvm.mmrts.rdf.partition.PartitionConstants.DOC; +import static mvm.mmrts.rdf.partition.PartitionConstants.VALUE_FACTORY; +import static mvm.mmrts.rdf.partition.utils.RdfIO.readStatement; + +/** + * This iterator will seek forward in the underlying BatchScanner Iterator and group + * statements with the same subject. This guards against the fact that the BatchScanner can return + * statements out of order. + * <br/> + * TODO: Not the best solution. + * Class DocumentIterator + * Date: Aug 29, 2011 + * Time: 4:09:16 PM + */ +public class DocumentIterator implements Iterator<List<Statement>> { + + public static final int BATCH_SIZE = 1000; + + private int batchSize = BATCH_SIZE; //will hold up to 100 subject documents + /** + * TODO: Check performance against other multi maps + */ + private ListMultimap<Resource, Statement> documents = ArrayListMultimap.create(); + //TODO: Hate having to keep track of this, expensive to constantly check the "contains" + /** + * We keep track of a queue of subjects, so that the first one in will most likely have all of its document + * in our batch before popping. This assumes also that the documents won't get larger than 1000 at the most. + */ + private LinkedList<Resource> subjects = new LinkedList<Resource>(); + + private Iterator<Map.Entry<Key, Value>> iter; + private boolean hasNext = true; + + public DocumentIterator(Iterator<Map.Entry<Key, Value>> iter) { + this(iter, BATCH_SIZE); + } + + public DocumentIterator(Iterator<Map.Entry<Key, Value>> iter, int batchSize) { + this.iter = iter; + this.batchSize = batchSize; + fillDocumentMap(); + } + + protected void fillDocumentMap() { + try { + while ((documents.size() < batchSize) && statefulHasNext()) { + Statement stmt = nextStatement(); + Resource subj = stmt.getSubject(); + documents.put(subj, stmt); + if (!subjects.contains(subj)) + subjects.add(subj); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected boolean statefulHasNext() { + hasNext = iter.hasNext() && hasNext; + return hasNext; + } + + protected Statement nextStatement() throws Exception { + Map.Entry<Key, Value> entry = iter.next(); + Key key = entry.getKey(); + if (DOC.equals(key.getColumnFamily())) + return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY); + else + return readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), VALUE_FACTORY, false); + } + + @Override + public boolean hasNext() { + fillDocumentMap(); + return documents.size() > 0; + } + + @Override + public List<Statement> next() { + fillDocumentMap(); + if (subjects.size() > 0) { + Resource subject = subjects.pop(); + subjects.remove(subject); + List<Statement> doc = documents.removeAll(subject); + System.out.println(doc); + return doc; + } + return null; + } + + @Override + public void remove() { + this.next(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java new file mode 100644 index 0000000..378606c --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/operators/ShardSubjectLookup.java @@ -0,0 +1,167 @@ +package mvm.mmrts.rdf.partition.query.operators; + +import mvm.mmrts.rdf.partition.PartitionConstants; +import org.openrdf.query.algebra.QueryModelNodeBase; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; + +import java.util.*; + +/** + * Here the subject is not null, but there will be a list of + * predicate/object paired vars that may or may not be null + * <p/> + * Class ShardSubjectLookup + * Date: Jul 14, 2011 + * Time: 3:32:33 PM + */ +public class ShardSubjectLookup extends QueryModelNodeBase implements TupleExpr { + + private Var subject; + private List<Map.Entry<Var, Var>> predicateObjectPairs; + + private String timePredicate; + private String startTimeRange; + private String endTimeRange; + private String shardStartTimeRange; + private String shardEndTimeRange; + private PartitionConstants.TimeType timeType; + + public ShardSubjectLookup(Var subject) { + this(subject, new ArrayList<Map.Entry<Var, Var>>()); + } + + public ShardSubjectLookup(Var subject, List<Map.Entry<Var, Var>> predicateObjectPairs) { + this.subject = subject.clone(); + this.predicateObjectPairs = new ArrayList<Map.Entry<Var, Var>>(predicateObjectPairs); + } + + @Override + public <X extends Exception> void visit(QueryModelVisitor<X> visitor) throws X { + visitor.meetOther(this); + } + + @Override + public <X extends Exception> void visitChildren(QueryModelVisitor<X> visitor) throws X { + visitor.meet(subject); + for (Map.Entry<Var, Var> predObj : predicateObjectPairs) { + visitor.meet(predObj.getKey()); + visitor.meet(predObj.getValue()); + } + } + + @Override + public Set<String> getBindingNames() { + return getAssuredBindingNames(); + } + + @Override + public Set<String> getAssuredBindingNames() { + Set<String> bindingNames = new HashSet<String>(8); + + if (subject != null) { + bindingNames.add(subject.getName()); + } + for (Map.Entry<Var, Var> predObj : predicateObjectPairs) { + bindingNames.add(predObj.getKey().getName()); + bindingNames.add(predObj.getValue().getName()); + } + + return bindingNames; + } + + public void addPredicateObjectPair(Var predicate, Var object) { + this.predicateObjectPairs.add(new HashMap.SimpleEntry<Var, Var>(predicate, object)); + } + + public Var getSubject() { + return subject; + } + + public void setSubject(Var subject) { + this.subject = subject; + } + + public List<Map.Entry<Var, Var>> getPredicateObjectPairs() { + return predicateObjectPairs; + } + + public void setPredicateObjectPairs(List<Map.Entry<Var, Var>> predicateObjectPairs) { + this.predicateObjectPairs = predicateObjectPairs; + } + + public String getEndTimeRange() { + return endTimeRange; + } + + public void setEndTimeRange(String endTimeRange) { + this.endTimeRange = endTimeRange; + } + + public String getStartTimeRange() { + return startTimeRange; + } + + public void setStartTimeRange(String startTimeRange) { + this.startTimeRange = startTimeRange; + } + + public String getTimePredicate() { + return timePredicate; + } + + public void setTimePredicate(String timePredicate) { + this.timePredicate = timePredicate; + } + + public PartitionConstants.TimeType getTimeType() { + return timeType; + } + + public void setTimeType(PartitionConstants.TimeType timeType) { + this.timeType = timeType; + } + + public String getShardStartTimeRange() { + return shardStartTimeRange; + } + + public void setShardStartTimeRange(String shardStartTimeRange) { + this.shardStartTimeRange = shardStartTimeRange; + } + + public String getShardEndTimeRange() { + return shardEndTimeRange; + } + + public void setShardEndTimeRange(String shardEndTimeRange) { + this.shardEndTimeRange = shardEndTimeRange; + } + + public ShardSubjectLookup clone() { + return (ShardSubjectLookup) super.clone(); + } + + @Override + public boolean equals(Object other) { + return other instanceof ShardSubjectLookup && super.equals(other); + } + + @Override + public int hashCode() { + return super.hashCode() ^ "ShardSubjectLookup".hashCode(); + } + + @Override + public String toString() { + return "ShardSubjectLookup{" + + "subject=" + subject + + ", predicateObjectPairs=" + predicateObjectPairs + + ", timePredicate='" + timePredicate + '\'' + + ", startTimeRange='" + startTimeRange + '\'' + + ", endTimeRange='" + endTimeRange + '\'' + + ", timeType=" + timeType + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java new file mode 100644 index 0000000..304fadf --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/DateHashModShardValueGenerator.java @@ -0,0 +1,52 @@ +package mvm.mmrts.rdf.partition.shard; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Class DateHashModShardValueGenerator + * Date: Jul 6, 2011 + * Time: 6:29:50 PM + */ +public class DateHashModShardValueGenerator implements ShardValueGenerator { + + protected int baseMod = 50; + + protected SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); + private static final String DATE_SHARD_DELIM = "_"; + + public DateHashModShardValueGenerator() { + } + + public DateHashModShardValueGenerator(SimpleDateFormat format, int baseMod) { + this.baseMod = baseMod; + this.format = format; + } + + @Override + public String generateShardValue(Object obj) { + return this.generateShardValue(System.currentTimeMillis(), obj); + } + + public String generateShardValue(Long date, Object obj) { + if (obj == null) + return format.format(new Date(date)); + return format.format(new Date(date)) + DATE_SHARD_DELIM + (Math.abs(obj.hashCode() % baseMod)); + } + + public int getBaseMod() { + return baseMod; + } + + public void setBaseMod(int baseMod) { + this.baseMod = baseMod; + } + + public SimpleDateFormat getFormat() { + return format; + } + + public void setFormat(SimpleDateFormat format) { + this.format = format; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java new file mode 100644 index 0000000..2b1c296 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/shard/ShardValueGenerator.java @@ -0,0 +1,12 @@ +package mvm.mmrts.rdf.partition.shard; + +/** + * Interface ShardValueGenerator + * Date: Jul 6, 2011 + * Time: 6:29:08 PM + */ +public interface ShardValueGenerator { + + public String generateShardValue(Object obj); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java new file mode 100644 index 0000000..966f546 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/ContextsStatementImpl.java @@ -0,0 +1,30 @@ +package mvm.mmrts.rdf.partition.utils; + +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.StatementImpl; + +/** + * Class ContextsStatementImpl + * Date: Aug 5, 2011 + * Time: 7:48:56 AM + */ +public class ContextsStatementImpl extends StatementImpl { + private Resource[] contexts; + + public ContextsStatementImpl(Resource subject, URI predicate, Value object, Resource... contexts) { + super(subject, predicate, object); + this.contexts = contexts; + } + + public Resource[] getContexts() { + return contexts; + } + + @Override + public Resource getContext() { + //return first context in array + return (contexts != null && contexts.length > 0) ? contexts[0] : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java new file mode 100644 index 0000000..2b83c6b --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/CountPredObjPairs.java @@ -0,0 +1,39 @@ +package mvm.mmrts.rdf.partition.utils; + +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; + +import java.util.List; +import java.util.Map; + +/** + * Class CountPredObjPairs + * Date: Apr 12, 2011 + * Time: 1:31:05 PM + */ +public class CountPredObjPairs { + + public CountPredObjPairs() { + } + + public double getCount(TupleExpr expr) { + int count = 100; + if (expr instanceof ShardSubjectLookup) { + ShardSubjectLookup lookup = (ShardSubjectLookup) expr; + List<Map.Entry<Var, Var>> entries = lookup.getPredicateObjectPairs(); + count -= (lookup.getSubject().hasValue()) ? 1 : 0; + count -= (lookup.getTimePredicate() != null) ? 1 : 0; + for (Map.Entry<Var, Var> entry : entries) { + count -= (entry.getValue().hasValue() && entry.getKey().hasValue()) ? 1 : 0; + } + } else if (expr instanceof StatementPattern) { + StatementPattern sp = (StatementPattern) expr; + count -= (sp.getSubjectVar().hasValue()) ? 1 : 0; + count -= (sp.getPredicateVar().hasValue() && sp.getObjectVar().hasValue()) ? 1 : 0; + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java new file mode 100644 index 0000000..3e3b024 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/PartitionUtils.java @@ -0,0 +1,9 @@ +package mvm.mmrts.rdf.partition.utils; + +/** + * Class PartitionUtils + * Date: Jul 6, 2011 + * Time: 11:49:11 AM + */ +public class PartitionUtils { +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java new file mode 100644 index 0000000..b7d6ec8 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/utils/RdfIO.java @@ -0,0 +1,166 @@ +package mvm.mmrts.rdf.partition.utils; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import mvm.mmrts.rdf.partition.InvalidValueTypeMarkerRuntimeException; +import org.openrdf.model.*; +import org.openrdf.model.impl.StatementImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; + +/** + * Class RdfIO + * Date: Jul 6, 2011 + * Time: 12:13:18 PM + */ +public class RdfIO { + + public static byte[] writeStatement(Statement statement, boolean document) throws IOException { + if (statement == null) + return new byte[]{}; + ByteArrayDataOutput dataOut = ByteStreams.newDataOutput(); + + if (document) { + writeValue(dataOut, statement.getSubject()); + dataOut.writeByte(FAMILY_DELIM); + writeValue(dataOut, statement.getPredicate()); + dataOut.writeByte(FAMILY_DELIM); + writeValue(dataOut, statement.getObject()); +// dataOut.writeByte(FAMILY_DELIM); + } else { + //index + writeValue(dataOut, statement.getPredicate()); + dataOut.writeByte(INDEX_DELIM); + writeValue(dataOut, statement.getObject()); + dataOut.writeByte(FAMILY_DELIM); + writeValue(dataOut, statement.getSubject()); +// dataOut.writeByte(FAMILY_DELIM); + } + + return dataOut.toByteArray(); + } + + public static byte[] writeValue(Value value) throws IOException { + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + writeValue(output, value); + return output.toByteArray(); + } + + public static void writeValue(ByteArrayDataOutput dataOut, Value value) throws IOException { + if (value == null || dataOut == null) + throw new IllegalArgumentException("Arguments cannot be null"); + if (value instanceof URI) { + dataOut.writeByte(URI_MARKER); + dataOut.write(value.toString().getBytes()); + } else if (value instanceof BNode) { + dataOut.writeByte(BNODE_MARKER); + dataOut.write(((BNode) value).getID().getBytes()); + } else if (value instanceof Literal) { + Literal lit = (Literal) value; + + String label = lit.getLabel(); + String language = lit.getLanguage(); + URI datatype = lit.getDatatype(); + + if (datatype != null) { + dataOut.writeByte(DATATYPE_LITERAL_MARKER); + dataOut.write(label.getBytes()); + dataOut.writeByte(DATATYPE_LITERAL_MARKER); + writeValue(dataOut, datatype); + } else if (language != null) { + dataOut.writeByte(LANG_LITERAL_MARKER); + dataOut.write(label.getBytes()); + dataOut.writeByte(LANG_LITERAL_MARKER); + dataOut.write(language.getBytes()); + } else { + dataOut.writeByte(PLAIN_LITERAL_MARKER); + dataOut.write(label.getBytes()); + } + } else { + throw new IllegalArgumentException("unexpected value type: " + + value.getClass()); + } + } + + public static Statement readStatement(ByteArrayDataInput dataIn, ValueFactory vf) + throws IOException { + + return readStatement(dataIn, vf, true); + } + + //TODO: This could be faster somehow, more efficient + + private static byte[] readFully(ByteArrayDataInput dataIn, byte delim) { + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + try { + byte curr; + while ((curr = dataIn.readByte()) != delim) { + output.writeByte(curr); + } + } catch (IllegalStateException e) { + //end of array + } + return output.toByteArray(); + } + + public static Statement readStatement(ByteArrayDataInput dataIn, ValueFactory vf, boolean doc) + throws IOException { + + //doc order: subject/0predicate/0object + //index order: predicate/1object/0subject + byte delim = (doc) ? FAMILY_DELIM : INDEX_DELIM; + List<Value> values = new ArrayList<Value>(); + while (values.size() < 3) { + Value addThis = readValue(dataIn, vf, delim); + values.add(addThis); + delim = FAMILY_DELIM; + } + + if (doc) + return new StatementImpl((Resource) values.get(0), (URI) values.get(1), values.get(2)); + else + return new StatementImpl((Resource) values.get(2), (URI) values.get(0), values.get(1)); + } + + public static Value readValue(ByteArrayDataInput dataIn, ValueFactory vf, byte delim) throws IOException { + int valueTypeMarker; + try { + valueTypeMarker = dataIn.readByte(); + } catch (Exception e) { + throw new IOException(e); + } + Value addThis = null; + if (valueTypeMarker == URI_MARKER) { + byte[] bytes = readFully(dataIn, delim); + addThis = vf.createURI(new String(bytes)); + } else if (valueTypeMarker == BNODE_MARKER) { + byte[] bytes = readFully(dataIn, delim); + addThis = vf.createBNode(new String(bytes)); + } else if (valueTypeMarker == PLAIN_LITERAL_MARKER) { + byte[] bytes = readFully(dataIn, delim); + addThis = vf.createLiteral(new String(bytes)); + } else if (valueTypeMarker == LANG_LITERAL_MARKER) { + byte[] bytes = readFully(dataIn, (byte) LANG_LITERAL_MARKER); + String label = new String(bytes); + bytes = readFully(dataIn, delim); + addThis = vf.createLiteral(label, new String(bytes)); + } else if (valueTypeMarker == DATATYPE_LITERAL_MARKER) { + byte[] bytes = readFully(dataIn, (byte) DATATYPE_LITERAL_MARKER); + String label_s = new String(bytes); + if (URI_MARKER != dataIn.readByte()) { + throw new IllegalArgumentException("Expected a URI datatype here"); + } + bytes = readFully(dataIn, delim); + addThis = vf.createLiteral(label_s, vf.createURI(new String(bytes))); + } else { + throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type marker: " + + valueTypeMarker); + } + return addThis; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec b/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec new file mode 100644 index 0000000..74ddbe2 --- /dev/null +++ b/partition/partition.rdf/src/main/resources/partitionTableLoad.cbexec @@ -0,0 +1,4 @@ +createtable rdfShardIndex +createtable rdfPartition +config -t rdfPartition -s table.split.threshold=3G +config -t rdfPartition -s table.compaction.major.ratio=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java new file mode 100644 index 0000000..d5c082a --- /dev/null +++ b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData.java @@ -0,0 +1,79 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.security.ColumnVisibility; +import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepository; + +import javax.xml.datatype.DatatypeFactory; + +public class LoadPartitionData { + + public static final String NAMESPACE = "http://here/2010/tracked-data-provenance/ns#";//44 len + public static final String RDF_NS = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"; + + static ValueFactory vf = ValueFactoryImpl.getInstance(); + + /** + * @param args + */ + public static void main(String[] args) { + try { + + final PartitionSail store = new PartitionSail(new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()), "rdfPartition"); + store.setContextColVisConverter(new ContextColVisConverter() { + + @Override + public ColumnVisibility convertContexts(Resource... contexts) { + if (contexts != null) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < contexts.length; i++) { + Resource context = contexts[i]; + if (context instanceof URI) { + URI uri = (URI) context; + sb.append(uri.getLocalName()); + if (i != (contexts.length - 1)) { + sb.append("|"); + } + } + } + return new ColumnVisibility(sb.toString()); + } + return null; + } + }); + Repository myRepository = new SailRepository(store); + myRepository.initialize(); + + RepositoryConnection conn = myRepository.getConnection(); + + URI A = vf.createURI("urn:colvis#A"); + URI B = vf.createURI("urn:colvis#B"); + URI C = vf.createURI("urn:colvis#C"); + + String uuid = "uuidAuth1"; + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created")), A, B, C); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, "objectUuid1")), A, B); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A")), A, B); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit")), A); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 0, 0, 0, 0))), B, C); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "reportedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 1, 0, 0, 0))), C); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true))); + + conn.commit(); + conn.close(); + + myRepository.shutDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java new file mode 100644 index 0000000..29682ad --- /dev/null +++ b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadPartitionData2.java @@ -0,0 +1,69 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.security.ColumnVisibility; +import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepository; + +public class LoadPartitionData2 { + + public static final String NAMESPACE = "http://here/2010/tracked-data-provenance/ns#";//44 len + public static final String RDF_NS = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"; + + static ValueFactory vf = ValueFactoryImpl.getInstance(); + + /** + * @param args + */ + public static void main(String[] args) { + try { + + DateHashModShardValueGenerator gen = new DateHashModShardValueGenerator(); + gen.setBaseMod(10); + final PartitionSail store = new PartitionSail(new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()), "rdfPartition", gen); + store.setContextColVisConverter(new ContextColVisConverter() { + + @Override + public ColumnVisibility convertContexts(Resource... contexts) { + if (contexts != null) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < contexts.length; i++) { + Resource context = contexts[i]; + if (context instanceof URI) { + URI uri = (URI) context; + sb.append(uri.getLocalName()); + if (i != (contexts.length - 1)) { + sb.append("|"); + } + } + } + return new ColumnVisibility(sb.toString()); + } + return null; + } + }); + Repository myRepository = new SailRepository(store); + myRepository.initialize(); + + RepositoryConnection conn = myRepository.getConnection(); + + conn.add(new StatementImpl(vf.createURI("http://www.Department0.University0.edu/GraduateStudent44"), vf.createURI("urn:lubm:test#specific"), vf.createURI("urn:lubm:test#value"))); + + conn.commit(); + conn.close(); + + myRepository.shutDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java new file mode 100644 index 0000000..4e86491 --- /dev/null +++ b/partition/partition.rdf/src/test/java/mvm/mmrts/rdf/partition/LoadSampleData.java @@ -0,0 +1,64 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.security.ColumnVisibility; +import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepository; + +import javax.xml.datatype.DatatypeFactory; + +public class LoadSampleData { + + public static final String NAMESPACE = "http://here/2010/tracked-data-provenance/ns#";//44 len + public static final String RDF_NS = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"; + + static ValueFactory vf = ValueFactoryImpl.getInstance(); + + /** + * @param args + */ + public static void main(String[] args) { + try { + + final PartitionSail store = new PartitionSail(new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()), "partTest", "shardIndexTest"); + + Repository myRepository = new SailRepository(store); + myRepository.initialize(); + + RepositoryConnection conn = myRepository.getConnection(); + + String uuid = "uuidAuth1"; + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, "objectUuid1"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 0, 0, 0, 0)))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "reportedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 1, 0, 0, 0)))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true))); + + uuid = "uuidAuth4"; + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(RDF_NS, "type"), vf.createURI(NAMESPACE, "Created"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "createdItem"), vf.createURI(NAMESPACE, "objectUuid1"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedBy"), vf.createURI("urn:system:A"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "stringLit"), vf.createLiteral("stringLit"))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "performedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 0, 0, 0, 0)))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "reportedAt"), vf.createLiteral(DatatypeFactory.newInstance().newXMLGregorianCalendar(2011, 7, 12, 6, 1, 0, 0, 0)))); + conn.add(new StatementImpl(vf.createURI(NAMESPACE, uuid), vf.createURI(NAMESPACE, "booleanLit"), vf.createLiteral(true))); + + conn.commit(); + conn.close(); + + myRepository.shutDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + +}
