http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java new file mode 100644 index 0000000..38c9ea5 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java @@ -0,0 +1,331 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +import cloudbase.core.CBConstants; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import mvm.rya.cloudbase.utils.input.CloudbaseBatchScannerInputFormat; +import mvm.mmrts.rdf.partition.mr.iterators.SortedEncodedRangeIterator; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Var; +import ss.cloudbase.core.iterators.GMDenIntersectingIterator; +import ss.cloudbase.core.iterators.SortedRangeIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*; + +/** + * Class SparqlCloudbaseIFTransformer + * Date: Sep 1, 2011 + * Time: 11:28:48 AM + */ +public class SparqlCloudbaseIFTransformer { + + protected Job job; + + protected String userName; + protected String pwd; + protected String instance; + protected String zk; + + protected ShardSubjectLookup lookup; +// protected Configuration configuration; + protected String table; + + protected DateHashModShardValueGenerator generator; + + public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table, + String userName, String pwd, String instance, String zk) throws QueryEvaluationException { + this(lookup, configuration, job, table, userName, pwd, instance, zk, new DateHashModShardValueGenerator()); + } + + public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table, + String userName, String pwd, String instance, String zk, DateHashModShardValueGenerator generator) throws QueryEvaluationException { + this.lookup = lookup; +// this.configuration = configuration; + this.table = table; + this.job = job; + this.userName = userName; + this.pwd = pwd; + this.instance = instance; + this.zk = zk; + this.generator = generator; + + this.initialize(); + } + + + public void initialize() throws QueryEvaluationException { + try { + /** + * Here we will set up the BatchScanner based on the lookup + */ + Var subject = lookup.getSubject(); + List<Map.Entry<Var, Var>> where = retrieveWhereClause(); + List<Map.Entry<Var, Var>> select = retrieveSelectClause(); + + //global start-end time + long start = job.getConfiguration().getLong(START_BINDING, 0); + long end = job.getConfiguration().getLong(END_BINDING, System.currentTimeMillis()); + + int whereSize = where.size() + ((!isTimeRange(lookup, job.getConfiguration())) ? 0 : 1); + + if (subject.hasValue() + && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */ + && select.size() == 0) { + /** + * Case 1: Subject is set, but predicate, object are not. + * Return all for the subject + */ +// this.scanner = scannerForSubject((URI) subject.getValue()); +// if (this.scanner == null) { +// this.iter = new EmptyIteration(); +// return; +// } +// Map.Entry<Var, Var> predObj = lookup.getPredicateObjectPairs().get(0); +// this.iter = new SelectAllIterator(this.bindings, this.scanner.iterator(), predObj.getKey(), predObj.getValue()); + throw new UnsupportedOperationException("Query Case not supported"); + } else if (subject.hasValue() + && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { + /** + * Case 2: Subject is set, and a few predicates are set, but no objects + * Return all, and filter which predicates you are interested in + */ +// this.scanner = scannerForSubject((URI) subject.getValue()); +// if (this.scanner == null) { +// this.iter = new EmptyIteration(); +// return; +// } +// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + throw new UnsupportedOperationException("Query Case not supported"); + } else if (subject.hasValue() + && where.size() >= 1 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { + /** + * Case 2a: Subject is set, and a few predicates are set, and one object + * TODO: For now we will ignore the predicate-object filter because we do not know how to query for this + */ +// this.scanner = scannerForSubject((URI) subject.getValue()); +// if (this.scanner == null) { +// this.iter = new EmptyIteration(); +// return; +// } +// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + throw new UnsupportedOperationException("Query Case not supported"); + } else if (!subject.hasValue() && whereSize > 1) { + /** + * Case 3: Subject is not set, more than one where clause + */ + scannerForPredicateObject(lookup, start, end, where); + setSelectFilter(subject, select); + } else if (!subject.hasValue() && whereSize == 1) { + /** + * Case 4: No subject, only one where clause + */ + Map.Entry<Var, Var> predObj = null; + if (where.size() == 1) { + predObj = where.get(0); + } + scannerForPredicateObject(lookup, start, end, predObj); + setSelectFilter(subject, select); + } else if (!subject.hasValue() && whereSize == 0 && select.size() > 1) { + /** + * Case 5: No subject, no where (just 1 select) + */ +// this.scanner = scannerForPredicates(start, end, select); +// if (this.scanner == null) { +// this.iter = new EmptyIteration(); +// return; +// } +// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); + throw new UnsupportedOperationException("Query Case not supported"); + } else if (!subject.hasValue() && whereSize == 0 && select.size() == 1) { + /** + * Case 5: No subject, no where (just 1 select) + */ +// cloudbase.core.client.Scanner sc = scannerForPredicate(start, end, (URI) select.get(0).getKey().getValue()); +// if (sc == null) { +// this.iter = new EmptyIteration(); +// return; +// } +// this.iter = new FilterIterator(this.bindings, sc.iterator(), subject, select); + throw new UnsupportedOperationException("Query Case not supported"); + } else { + throw new QueryEvaluationException("Case not supported as of yet"); + } + + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } + + protected void setSelectFilter(Var subj, List<Map.Entry<Var, Var>> select) { + List<String> selectStrs = new ArrayList<String>(); + for (Map.Entry<Var, Var> entry : select) { + Var key = entry.getKey(); + Var obj = entry.getValue(); + if (key.hasValue()) { + String pred_s = key.getValue().stringValue(); + selectStrs.add(pred_s); + job.getConfiguration().set(pred_s, obj.getName()); + } + } + job.getConfiguration().setStrings(SELECT_FILTER, selectStrs.toArray(new String[selectStrs.size()])); + job.getConfiguration().set(SUBJECT_NAME, subj.getName()); + } + + protected List<Map.Entry<Var, Var>> retrieveWhereClause() { + List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>(); + for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { + Var pred = entry.getKey(); + Var object = entry.getValue(); + if (pred.hasValue() && object.hasValue()) { + where.add(entry); //TODO: maybe we should clone this? + } + } + return where; + } + + protected List<Map.Entry<Var, Var>> retrieveSelectClause() { + List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, Var>>(); + for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { + Var pred = entry.getKey(); + Var object = entry.getValue(); + if (pred.hasValue() && !object.hasValue()) { + select.add(entry); //TODO: maybe we should clone this? + } + } + return select; + } + + protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, List<Map.Entry<Var, Var>> predObjs) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + int extra = 0; + + if (isTimeRange(lookup, job.getConfiguration())) { + extra += 1; + } + + Text[] queries = new Text[predObjs.size() + extra]; + for (int i = 0; i < predObjs.size(); i++) { + Map.Entry<Var, Var> predObj = predObjs.get(i); + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + writeValue(output, predObj.getKey().getValue()); + output.write(INDEX_DELIM); + writeValue(output, predObj.getValue().getValue()); + queries[i] = new Text(output.toByteArray()); + } + + if (isTimeRange(lookup, job.getConfiguration())) { + queries[queries.length - 1] = new Text( + GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), + getStartTimeRange(lookup, job.getConfiguration()) + , true, + getEndTimeRange(lookup, job.getConfiguration()), + true + ) + ); + } + + createBatchScannerInputFormat(); + CloudbaseBatchScannerInputFormat.setIterator(job, 20, GMDenIntersectingIterator.class.getName(), "ii"); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.docFamilyOptionName, DOC.toString()); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString()); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(queries)); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton( + range + )); + } + + protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, Map.Entry<Var, Var> predObj) throws IOException, TableNotFoundException { + start = validateFillStartTime(start, lookup); + end = validateFillEndTime(end, lookup); + + /** + * Need to use GMDen because SortedRange can't serialize non xml characters in range + * @see https://issues.apache.org/jira/browse/MAPREDUCE-109 + */ + createBatchScannerInputFormat(); + CloudbaseBatchScannerInputFormat.setIterator(job, 20, SortedEncodedRangeIterator.class.getName(), "ri"); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_DOC_COLF, DOC.toString()); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_COLF, INDEX.toString()); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_MULTI_DOC, "" + true); + + String lower, upper = null; + if (isTimeRange(lookup, job.getConfiguration())) { + lower = getStartTimeRange(lookup, job.getConfiguration()); + upper = getEndTimeRange(lookup, job.getConfiguration()); + } else { + + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + writeValue(output, predObj.getKey().getValue()); + output.write(INDEX_DELIM); + writeValue(output, predObj.getValue().getValue()); + + lower = new String(output.toByteArray()); + upper = lower + "\01"; + } + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_LOWER_BOUND, SortedEncodedRangeIterator.encode(lower)); + CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_UPPER_BOUND, SortedEncodedRangeIterator.encode(upper)); + + //TODO: Do we add a time predicate to this? +// bs.setScanIterators(19, FilteringIterator.class.getName(), "filteringIterator"); +// bs.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); +// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, (end - start) + ""); +// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, end + ""); + + Range range = new Range( + new Key(new Text(generator.generateShardValue(start, null) + "\0")), + new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) + ); + CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton( + range + )); + + } + + protected void createBatchScannerInputFormat() { + job.setInputFormatClass(CloudbaseBatchScannerInputFormat.class); + CloudbaseBatchScannerInputFormat.setInputInfo(job, userName, pwd.getBytes(), table, CBConstants.NO_AUTHS); //may need to change these auths sometime soon + CloudbaseBatchScannerInputFormat.setZooKeeperInstance(job, instance, zk); + job.setMapperClass(KeyValueToMapWrMapper.class); + job.setCombinerClass(AggregateTriplesBySubjectCombiner.class); + job.setReducerClass(AggregateTriplesBySubjectReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(MapWritable.class); + job.setOutputKeyClass(LongWritable.class); + job.setOutputValueClass(MapWritable.class); + + job.getConfiguration().set("io.sort.mb", "256"); + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); + job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java new file mode 100644 index 0000000..84f83c0 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java @@ -0,0 +1,12 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +/** + * Class SparqlCloudbaseIFTransformerConstants + * Date: Sep 1, 2011 + * Time: 5:01:10 PM + */ +public class SparqlCloudbaseIFTransformerConstants { + public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.transform."; + public static final String SELECT_FILTER = PREFIX + "select"; + public static final String SUBJECT_NAME = PREFIX + "subject"; +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java new file mode 100644 index 0000000..effb9ff --- /dev/null +++ b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java @@ -0,0 +1,33 @@ +package mvm.mmrts.rdf.partition.mr.compat; + +import junit.framework.TestCase; + +/** + * Class ChangeShardDateFormatToolTest + * Date: Dec 9, 2011 + * Time: 10:39:31 AM + */ +public class ChangeShardDateFormatToolTest extends TestCase { + + public void testShardDelim() throws Exception { + String dateDelim = "-"; + String shard = "2011-11-01"; + int shardIndex = shard.lastIndexOf(dateDelim); + if (shardIndex == -1) + fail(); + String date = shard.substring(0, shardIndex); + shard = shard.substring(shardIndex + 1, shard.length()); + assertEquals("2011-11", date); + assertEquals("01", shard); + + dateDelim = "_"; + shard = "20111101_33"; + shardIndex = shard.lastIndexOf(dateDelim); + if (shardIndex == -1) + fail(); + date = shard.substring(0, shardIndex); + shard = shard.substring(shardIndex + 1, shard.length()); + assertEquals("20111101", date); + assertEquals("33", shard); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java new file mode 100644 index 0000000..c279348 --- /dev/null +++ b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java @@ -0,0 +1,80 @@ +package mvm.mmrts.rdf.partition.mr.fileinput; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.data.ColumnUpdate; +import cloudbase.core.data.Mutation; +import junit.framework.TestCase; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.zookeeper.ZooKeeper; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; + +import java.util.Collection; +import java.util.List; + +/** + * Class RdfFileInputToolTest + * Date: Aug 8, 2011 + * Time: 3:22:25 PM + */ +public class RdfFileInputToolTest extends TestCase { + + ValueFactory vf = ValueFactoryImpl.getInstance(); + + /** + * MRUnit for latest mapreduce (0.21 api) + * <p/> + * 1. Test to see if the bytes overwrite will affect + */ + + private Mapper<LongWritable, BytesWritable, Text, BytesWritable> mapper = new RdfFileInputToCloudbaseTool.OutSubjStmtMapper(); + private Reducer<Text, BytesWritable, Text, Mutation> reducer = new RdfFileInputToCloudbaseTool.StatementToMutationReducer(); + private MapReduceDriver<LongWritable, BytesWritable, Text, BytesWritable, Text, Mutation> driver; + + @Override + protected void setUp() throws Exception { + super.setUp(); + driver = new MapReduceDriver(mapper, reducer); + Configuration conf = new Configuration(); + conf.set(RdfFileInputToCloudbaseTool.CB_TABLE_PROP, "table"); + driver.setConfiguration(conf); + } + + public void testNormalRun() throws Exception { + StatementImpl stmt1 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("object")); + StatementImpl stmt2 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("obje")); + StatementImpl stmt3 = new StatementImpl(vf.createURI("urn:namespace#subj2"), vf.createURI("urn:namespace#pred"), vf.createLiteral("ob")); + List<Pair<Text, Mutation>> pairs = driver. + withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt1, true))). + withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt2, true))). + withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt3, true))). + run(); + + assertEquals(4, pairs.size()); + + ColumnUpdate update = pairs.get(0).getSecond().getUpdates().get(0); + assertEquals("event", new String(update.getColumnFamily())); + assertEquals("\07urn:namespace#subj2\0\07urn:namespace#pred\0\u0009ob", new String(update.getColumnQualifier())); + } + + public static void main(String[] args) { + try { + Connector connector = new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()); + Collection<Text> splits = connector.tableOperations().getSplits("partitionRdf", Integer.MAX_VALUE); + System.out.println(splits.size()); + System.out.println(splits); + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java new file mode 100644 index 0000000..bd63f6f --- /dev/null +++ b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java @@ -0,0 +1,20 @@ +package mvm.mmrts.rdf.partition.mr.fileinput.bulk; + +import junit.framework.TestCase; +import org.apache.hadoop.io.Text; + +/** + * Class EmbedKeyRangePartitionerTest + * Date: Sep 13, 2011 + * Time: 1:58:28 PM + */ +public class EmbedKeyRangePartitionerTest extends TestCase { + + public void testRetrieveEmbedKey() throws Exception { + assertEquals(new Text("hello"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello\1there"))); + assertEquals(new Text("h"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("h\1there"))); + assertEquals(new Text(""), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("\1there"))); + assertEquals(new Text("hello there"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello there"))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/pom.xml ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/pom.xml b/partition/partition.rdf/pom.xml new file mode 100644 index 0000000..2701d64 --- /dev/null +++ b/partition/partition.rdf/pom.xml @@ -0,0 +1,281 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>parent</artifactId> + <version>3.0.0.alpha1</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>mvm.mmrts.rdf</groupId> + <artifactId>partition.rdf</artifactId> + <version>1.0.0-SNAPSHOT</version> + <name>${project.groupId}.${project.artifactId}</name> + + <dependencies> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-sparqlxml</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-rdfxml</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>sitestore.common</groupId> + <artifactId>common-query</artifactId> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.utils</artifactId> + </dependency> + + <!-- Cloudbase deps --> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + + <!-- Test --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <!-- Deps that are transitive but listed anyway + + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-model</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-query</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryalgebra-model</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-serql</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-sparql</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryparser-serql</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-binary</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-sparqljson</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryresultio-text</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-manager</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-event</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-sail</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-memory</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-inferencer</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryalgebra-evaluation</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-http</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-http-client</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-contextaware</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-repository-dataset</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-http-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-ntriples</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-n3</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-trix</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-turtle</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-trig</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-api</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-nativerdf</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-sail-rdbms</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-collections</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-iteration</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-io</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-lang</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-i18n</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-concurrent</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-xml</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-text</artifactId> + </dependency> + <dependency> + <groupId>info.aduna.commons</groupId> + <artifactId>aduna-commons-net</artifactId> + </dependency> + <dependency> + <groupId>commons-dbcp</groupId> + <artifactId>commons-dbcp</artifactId> + </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> --> + + </dependencies> + <repositories> + <repository> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + <id>aduna-opensource.releases</id> + <name>Aduna Open Source - Maven releases</name> + <url>http://repo.aduna-software.org/maven2/releases</url> + </repository> + </repositories> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*IntegrationTest.java + </exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java new file mode 100644 index 0000000..0c723a1 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java @@ -0,0 +1,34 @@ +package mvm.mmrts.rdf.partition; + +/** + * Class InvalidValueTypeMarkerRuntimeException + * Date: Jan 7, 2011 + * Time: 12:58:27 PM + */ +public class InvalidValueTypeMarkerRuntimeException extends RuntimeException { + private int valueTypeMarker = -1; + + public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker) { + super(); + this.valueTypeMarker = valueTypeMarker; + } + + public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s) { + super(s); + this.valueTypeMarker = valueTypeMarker; + } + + public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s, Throwable throwable) { + super(s, throwable); + this.valueTypeMarker = valueTypeMarker; + } + + public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, Throwable throwable) { + super(throwable); + this.valueTypeMarker = valueTypeMarker; + } + + public int getValueTypeMarker() { + return valueTypeMarker; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java new file mode 100644 index 0000000..83e0675 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java @@ -0,0 +1,306 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.client.BatchWriter; +import cloudbase.core.client.Connector; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.admin.TableOperations; +import cloudbase.core.data.Key; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Range; +import cloudbase.core.security.ColumnVisibility; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import info.aduna.iteration.CloseableIteration; +import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; +import mvm.mmrts.rdf.partition.iterators.NamespaceIterator; +import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor; +import mvm.mmrts.rdf.partition.query.evaluation.PartitionEvaluationStrategy; +import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer; +import mvm.mmrts.rdf.partition.shard.ShardValueGenerator; +import mvm.mmrts.rdf.partition.utils.ContextsStatementImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.model.*; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.impl.EmptyBindingSet; +import org.openrdf.sail.SailException; +import org.openrdf.sail.helpers.SailConnectionBase; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +/** + * Class PartitionConnection + * Date: Jul 6, 2011 + * Time: 4:40:49 PM + * <p/> + * Ingest: + * Triple -> + * - <subject> <shard>: + * - <shard> event:<subject>\0<predicate>\0<object> + * - <shard> index:<predicate>\1<object>\0 + * <p/> + * Namespace -> + * - <prefix> ns:<namespace> + */ +public class PartitionConnection extends SailConnectionBase { + + private PartitionSail sail; + private BatchWriter writer; + private BatchWriter shardTableWriter; //MMRTS-148 + + private Multimap<Resource, ContextsStatementImpl> statements = HashMultimap.create(10000, 10); + + + public PartitionConnection(PartitionSail sailBase) throws SailException { + super(sailBase); + this.sail = sailBase; + this.initialize(); + } + + protected void initialize() throws SailException { + try { + Connector connector = sail.connector; + String table = sail.table; + String shardTable = sail.shardTable; + + //create these tables if they do not exist + TableOperations tableOperations = connector.tableOperations(); + boolean tableExists = tableOperations.exists(table); + if (!tableExists) + tableOperations.create(table); + + tableExists = tableOperations.exists(shardTable); + if(!tableExists) + tableOperations.create(shardTable); + + writer = connector.createBatchWriter(table, 1000000l, 60000l, 10); + shardTableWriter = connector.createBatchWriter(shardTable, 1000000l, 60000l, 10); + } catch (Exception e) { + throw new SailException(e); + } + } + + @Override + protected void closeInternal() throws SailException { + try { + writer.close(); + shardTableWriter.close(); + } catch (Exception e) { + throw new SailException(e); + } + } + + @Override + protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(TupleExpr tupleExpr, Dataset dataset, BindingSet bindingSet, boolean b) throws SailException { +// throw new UnsupportedOperationException("Query not supported"); + + if (!(tupleExpr instanceof QueryRoot)) + tupleExpr = new QueryRoot(tupleExpr); + + try { + Configuration queryConf = populateConf(bindingSet); + //timeRange filter check + tupleExpr.visit(new FilterTimeIndexVisitor(queryConf)); + + (new SubjectGroupingOptimizer(queryConf)).optimize(tupleExpr, dataset, bindingSet); + PartitionTripleSource source = new PartitionTripleSource(this.sail, queryConf); + + PartitionEvaluationStrategy strategy = new PartitionEvaluationStrategy( + source, dataset); + + return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance()); + } catch (Exception e) { + throw new SailException(e); + } + + } + + protected Configuration populateConf(BindingSet bs) { + Configuration conf = new Configuration(this.sail.conf); + + for (String bname : bs.getBindingNames()) { + conf.set(bname, bs.getValue(bname).stringValue()); + } + Binding start = bs.getBinding(START_BINDING); + if (start != null) + conf.setLong(START_BINDING, Long.parseLong(start.getValue().stringValue())); + + Binding end = bs.getBinding(END_BINDING); + if (end != null) + conf.setLong(END_BINDING, Long.parseLong(end.getValue().stringValue())); + + Binding timePredicate = bs.getBinding(TIME_PREDICATE); + if (timePredicate != null) + conf.set(TIME_PREDICATE, timePredicate.getValue().stringValue()); + + Binding timeType = bs.getBinding(TIME_TYPE_PROP); + if (timeType != null) + conf.set(TIME_TYPE_PROP, timeType.getValue().stringValue()); + else if (timePredicate != null) + conf.set(TIME_TYPE_PROP, TimeType.XMLDATETIME.name()); //default to xml datetime + + return conf; + } + + @Override + protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException { + throw new UnsupportedOperationException("Contexts not supported"); + } + + @Override + protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(Resource resource, URI uri, Value value, boolean b, Resource... resources) throws SailException { + throw new UnsupportedOperationException("Query not supported"); + } + + @Override + protected long sizeInternal(Resource... resources) throws SailException { + throw new UnsupportedOperationException("Size operation not supported"); + } + + @Override + protected void startTransactionInternal() throws SailException { + // no transaction support as of yet + } + + @Override + protected void commitInternal() throws SailException { + try { + ShardValueGenerator gen = sail.generator; + ContextColVisConverter contextColVisConverter = sail.contextColVisConverter; + Map<Resource, Collection<ContextsStatementImpl>> map = statements.asMap(); + for (Map.Entry<Resource, Collection<ContextsStatementImpl>> entry : map.entrySet()) { + Resource subject = entry.getKey(); + byte[] subj_bytes = writeValue(subject); + String shard = gen.generateShardValue(subject); + Text shard_txt = new Text(shard); + Collection<ContextsStatementImpl> stmts = entry.getValue(); + + /** + * Triple - > + *- < subject ><shard >: + *- < shard > event:<subject >\0 < predicate >\0 < object > + *- < shard > index:<predicate >\1 < object >\0 + */ + Mutation m_subj = new Mutation(shard_txt); + for (ContextsStatementImpl stmt : stmts) { + Resource[] contexts = stmt.getContexts(); + ColumnVisibility vis = null; + if (contexts != null && contexts.length > 0 && contextColVisConverter != null) { + vis = contextColVisConverter.convertContexts(contexts); + } + + if (vis != null) { + m_subj.put(DOC, new Text(writeStatement(stmt, true)), vis, EMPTY_VALUE); + m_subj.put(INDEX, new Text(writeStatement(stmt, false)), vis, EMPTY_VALUE); + } else { + m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE); + m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE); + } + } + + /** + * TODO: Is this right? + * If the subject does not have any authorizations specified, then anyone can access it. + * But the true authorization check will happen at the predicate/object level, which means that + * the set returned will only be what the person is authorized to see. The shard lookup table has to + * have the lowest level authorization all the predicate/object authorizations; otherwise, + * a user may not be able to see the correct document. + */ + Mutation m_shard = new Mutation(new Text(subj_bytes)); + m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE); + shardTableWriter.addMutation(m_shard); + + writer.addMutation(m_subj); + } + + writer.flush(); + shardTableWriter.flush(); + statements.clear(); + } catch (Exception e) { + throw new SailException(e); + } + finally { + } + } + + @Override + protected void rollbackInternal() throws SailException { + statements.clear(); + } + + @Override + protected void addStatementInternal(Resource subject, URI predicate, Value object, Resource... contexts) throws SailException { + statements.put(subject, new ContextsStatementImpl(subject, predicate, object, contexts)); + } + + @Override + protected void removeStatementsInternal(Resource resource, URI uri, Value value, Resource... contexts) throws SailException { + throw new UnsupportedOperationException("Remove not supported as of yet"); + } + + @Override + protected void clearInternal(Resource... resources) throws SailException { + throw new UnsupportedOperationException("Clear with context not supported as of yet"); + } + + @Override + protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal() throws SailException { + return new NamespaceIterator(sail.connector, sail.table); + } + + @Override + protected String getNamespaceInternal(String prefix) throws SailException { + try { + Scanner scanner = sail.connector.createScanner(sail.table, ALL_AUTHORIZATIONS); + scanner.setRange(new Range(new Text(prefix))); + scanner.fetchColumnFamily(NAMESPACE); + Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = scanner.iterator(); + if (iter != null && iter.hasNext()) + return iter.next().getKey().getColumnQualifier().toString(); + } catch (Exception e) { + throw new SailException(e); + } + return null; + } + + @Override + protected void setNamespaceInternal(String prefix, String namespace) throws SailException { + /** + * Namespace -> + * - <prefix> <namespace>: + */ + + try { + Mutation m = new Mutation(new Text(prefix)); + m.put(NAMESPACE, new Text(namespace), EMPTY_VALUE); + writer.addMutation(m); + } catch (Exception e) { + throw new SailException(e); + } + } + + @Override + protected void removeNamespaceInternal + (String + s) throws SailException { + throw new UnsupportedOperationException("Namespace remove not supported"); + } + + @Override + protected void clearNamespacesInternal + () throws SailException { + throw new UnsupportedOperationException("Namespace Clear not supported"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java new file mode 100644 index 0000000..cb69596 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java @@ -0,0 +1,141 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.CBConstants; +import cloudbase.core.data.Value; +import cloudbase.core.security.Authorizations; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Class PartitionConstants + * Date: Jul 6, 2011 + * Time: 12:22:55 PM + */ +public class PartitionConstants { + + public static final String PARTITION_NS = "urn:mvm.mmrts.partition.rdf/08/2011#"; + public static ValueFactory VALUE_FACTORY = ValueFactoryImpl.getInstance(); + public static URI TIMERANGE = VALUE_FACTORY.createURI(PARTITION_NS, "timeRange"); + public static URI SHARDRANGE = VALUE_FACTORY.createURI(PARTITION_NS, "shardRange"); //shardRange(subject, start, stop) in ms + public static Literal EMPTY_LITERAL = VALUE_FACTORY.createLiteral(0); + + public static final byte FAMILY_DELIM = 0; + public static final String FAMILY_DELIM_STR = "\0"; + public static final byte INDEX_DELIM = 1; + public static final String INDEX_DELIM_STR = "\1"; + + /* RECORD TYPES */ +// public static final int NAMESPACE_MARKER = 2; +// +// public static final int EXPL_TRIPLE_MARKER = 3; +// +// public static final int EXPL_QUAD_MARKER = 4; +// +// public static final int INF_TRIPLE_MARKER = 5; +// +// public static final int INF_QUAD_MARKER = 6; + + public static final int URI_MARKER = 7; + + public static final String URI_MARKER_STR = "\07"; + + public static final int BNODE_MARKER = 8; + + public static final int PLAIN_LITERAL_MARKER = 9; + + public static final String PLAIN_LITERAL_MARKER_STR = "\u0009"; + + public static final int LANG_LITERAL_MARKER = 10; + + public static final int DATATYPE_LITERAL_MARKER = 11; + + public static final String DATATYPE_LITERAL_MARKER_STR = "\u000B"; + + public static final int EOF_MARKER = 127; + + // public static final Authorizations ALL_AUTHORIZATIONS = new Authorizations( + // "_"); + public static final Authorizations ALL_AUTHORIZATIONS = CBConstants.NO_AUTHS; + + public static final Value EMPTY_VALUE = new Value(new byte[0]); + public static final Text EMPTY_TXT = new Text(""); + + /* Column Families and Qualifiers */ + public static final Text INDEX = new Text("index"); + public static final Text DOC = new Text("event"); + public static final Text NAMESPACE = new Text("ns"); + + /* Time constants */ + public static final String START_BINDING = "binding.start"; + public static final String END_BINDING = "binding.end"; + public static final String TIME_PREDICATE = "binding.timePredicate"; + public static final String SHARDRANGE_BINDING = "binding.shardRange"; + public static final String SHARDRANGE_START = "binding.shardRange.start"; + public static final String SHARDRANGE_END = "binding.shardRange.end"; + public static final String TIME_TYPE_PROP = "binding.timeProp"; + public static final String AUTHORIZATION_PROP = "binding.authorization"; + public static final String NUMTHREADS_PROP = "binding.numthreads"; + public static final String ALLSHARDS_PROP = "binding.allshards"; + + public static final String VALUE_DELIMITER = "\03"; + + public static final SimpleDateFormat XMLDATE = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + + public enum TimeType { + TIMESTAMP, XMLDATETIME + } + + public static boolean isTimeRange(ShardSubjectLookup lookup, Configuration configuration) { + return (configuration.get(TIME_PREDICATE) != null) || (lookup.getTimePredicate() != null); + } + + public static Long validateFillStartTime(Long start, ShardSubjectLookup lookup) { + if (lookup.getShardStartTimeRange() != null) + return Long.parseLong(lookup.getShardEndTimeRange()); + return (start == null) ? 0l : start; + } + + public static Long validateFillEndTime(Long end, ShardSubjectLookup lookup) { + if (lookup.getShardEndTimeRange() != null) + return Long.parseLong(lookup.getShardEndTimeRange()); + return (end == null) ? System.currentTimeMillis() : end; + } + + public static String getStartTimeRange(ShardSubjectLookup lookup, Configuration configuration) { + String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate()); + String st = configProperty(configuration, START_BINDING, lookup.getStartTimeRange()); + TimeType tt = lookup.getTimeType(); + if (tt == null) + tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP)); + return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(st), tt); + } + + public static String getEndTimeRange(ShardSubjectLookup lookup, Configuration configuration) { + String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate()); + String et = configProperty(configuration, END_BINDING, lookup.getEndTimeRange()); + TimeType tt = lookup.getTimeType(); + if (tt == null) + tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP)); + return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(et), tt); + } + + public static String convertTime(Long timestamp, TimeType timeType) { + return (TimeType.XMLDATETIME.equals(timeType)) + ? (DATATYPE_LITERAL_MARKER_STR + XMLDATE.format(new Date(timestamp))) + : PLAIN_LITERAL_MARKER_STR + timestamp; + } + + public static String configProperty(Configuration configuration, String property, String checkValue) { + if (checkValue == null) + return configuration.get(property); + return checkValue; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java new file mode 100644 index 0000000..07eb411 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionSail.java @@ -0,0 +1,122 @@ +package mvm.mmrts.rdf.partition; + +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.Connector; +import cloudbase.core.client.ZooKeeperInstance; +import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import mvm.mmrts.rdf.partition.shard.ShardValueGenerator; +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.openrdf.sail.helpers.SailBase; + +/** + * Class PartitionSail + * Date: Jul 6, 2011 + * Time: 11:40:52 AM + */ +public class PartitionSail extends SailBase { + + protected Connector connector; + + protected String table; + //MMRTS-148 + protected String shardTable; + + protected ShardValueGenerator generator = new DateHashModShardValueGenerator(); + + protected Configuration conf = new Configuration(); + + protected ContextColVisConverter contextColVisConverter; + + public PartitionSail(Connector connector, String table) { + this(connector, table, table, null); + } + + public PartitionSail(Connector connector, String table, String shardTable) { + this(connector, table, shardTable, null); + } + + public PartitionSail(String instance, String zk, String user, String password, String table) + throws CBSecurityException, CBException { + this(instance, zk, user, password, table, (ShardValueGenerator) null); + } + + public PartitionSail(String instance, String zk, String user, String password, String table, ShardValueGenerator generator) + throws CBSecurityException, CBException { + this(new ZooKeeperInstance(instance, zk).getConnector(user, password.getBytes()), table, table, generator); + } + + public PartitionSail(String instance, String zk, String user, String password, String table, String shardTable) + throws CBSecurityException, CBException { + this(instance, zk, user, password, table, shardTable, null); + } + + public PartitionSail(String instance, String zk, String user, String password, String table, String shardTable, ShardValueGenerator generator) + throws CBSecurityException, CBException { + this(new ZooKeeperInstance(instance, zk).getConnector(user, password.getBytes()), table, shardTable, generator); + } + + public PartitionSail(Connector connector, String table, ShardValueGenerator generator) { + this(connector, table, table, generator); + } + + public PartitionSail(Connector connector, String table, String shardTable, ShardValueGenerator generator) { + this.connector = connector; + this.table = table; + this.shardTable = shardTable; + if (generator != null) + this.generator = generator; + } + + @Override + protected void shutDownInternal() throws SailException { + } + + @Override + protected SailConnection getConnectionInternal() throws SailException { + return new PartitionConnection(this); + } + + @Override + public boolean isWritable() throws SailException { + return true; + } + + @Override + public ValueFactory getValueFactory() { + return ValueFactoryImpl.getInstance(); + } + + public Configuration getConf() { + return conf; + } + + public Connector getConnector() { + return connector; + } + + public ShardValueGenerator getGenerator() { + return generator; + } + + public String getTable() { + return table; + } + + public String getShardTable() { + return shardTable; + } + + public ContextColVisConverter getContextColVisConverter() { + return contextColVisConverter; + } + + public void setContextColVisConverter(ContextColVisConverter contextColVisConverter) { + this.contextColVisConverter = contextColVisConverter; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java new file mode 100644 index 0000000..ca7772b --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionTripleSource.java @@ -0,0 +1,40 @@ +package mvm.mmrts.rdf.partition; + +import info.aduna.iteration.CloseableIteration; +import mvm.mmrts.rdf.partition.query.evaluation.ShardSubjectLookupStatementIterator; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.*; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.TripleSource; + +/** + * Class PartitionTripleSource + * Date: Jul 18, 2011 + * Time: 10:45:06 AM + */ +public class PartitionTripleSource implements TripleSource { + private PartitionSail sail; + private Configuration configuration; + + public PartitionTripleSource(PartitionSail sail, Configuration configuration) { + this.sail = sail; + this.configuration = configuration; + } + + @Override + public CloseableIteration<? extends Statement, QueryEvaluationException> getStatements(Resource resource, URI uri, Value value, Resource... resources) throws QueryEvaluationException { + return null; + } + + public CloseableIteration<BindingSet, QueryEvaluationException> getStatements(ShardSubjectLookup lookup, + BindingSet bindings, Resource... contexts) throws QueryEvaluationException { + return new ShardSubjectLookupStatementIterator(sail, lookup, bindings, configuration); + } + + @Override + public ValueFactory getValueFactory() { + return PartitionConstants.VALUE_FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java new file mode 100644 index 0000000..f462e9a --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/converter/ContextColVisConverter.java @@ -0,0 +1,14 @@ +package mvm.mmrts.rdf.partition.converter; + +import cloudbase.core.security.ColumnVisibility; +import org.openrdf.model.Resource; + +/** + * Interface ContextColVisConverter + * Date: Aug 5, 2011 + * Time: 7:35:40 AM + */ +public interface ContextColVisConverter { + + public ColumnVisibility convertContexts(Resource... contexts); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java new file mode 100644 index 0000000..fc007e9 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/iterators/NamespaceIterator.java @@ -0,0 +1,93 @@ +package mvm.mmrts.rdf.partition.iterators; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.TableNotFoundException; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import info.aduna.iteration.CloseableIteration; +import org.openrdf.model.Namespace; +import org.openrdf.model.impl.NamespaceImpl; +import org.openrdf.sail.SailException; + +import java.io.IOError; +import java.util.Iterator; +import java.util.Map.Entry; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; + +//TODO: Combine with CloudbaseStoreContextTableIterator4 +public class NamespaceIterator implements + CloseableIteration<Namespace, SailException> { + + private boolean open = false; + private Iterator<Entry<Key, Value>> result; + + public NamespaceIterator(Connector connector, String table) throws SailException { + initialize(connector, table); + open = true; + } + + protected void initialize(Connector connector, String table) throws SailException { + try { + Scanner scanner = connector.createScanner(table, + ALL_AUTHORIZATIONS); + scanner.fetchColumnFamily(NAMESPACE); + result = scanner.iterator(); + } catch (TableNotFoundException e) { + throw new SailException("Exception occurred in Namespace Iterator", + e); + } + } + + @Override + public void close() throws SailException { + try { + verifyIsOpen(); + open = false; + } catch (IOError e) { + throw new SailException(e); + } + } + + public void verifyIsOpen() throws SailException { + if (!open) { + throw new SailException("Iterator not open"); + } + } + + @Override + public boolean hasNext() throws SailException { + verifyIsOpen(); + return result != null && result.hasNext(); + } + + @Override + public Namespace next() throws SailException { + if (hasNext()) { + Namespace namespace = getNamespace(result); + return namespace; + } + return null; + } + + public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) { + for (; rowResults.hasNext();) { + Entry<Key, Value> next = rowResults.next(); + Key key = next.getKey(); + String cq = key.getColumnQualifier().toString(); + return new NamespaceImpl(key.getRow().toString(), cq.toString()); + } + + return null; + } + + @Override + public void remove() throws SailException { + next(); + } + + public boolean isOpen() { + return open; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java new file mode 100644 index 0000000..5964ea0 --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/FilterTimeIndexVisitor.java @@ -0,0 +1,113 @@ +package mvm.mmrts.rdf.partition.query.evaluation; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.impl.BooleanLiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.*; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import java.util.List; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; + +/** + * Class FilterTimeIndexVisitor + * Date: Apr 11, 2011 + * Time: 10:16:15 PM + */ +public class FilterTimeIndexVisitor extends QueryModelVisitorBase { + + private Configuration conf; + + public FilterTimeIndexVisitor(Configuration conf) { + this.conf = conf; + } + + @Override + public void meet(Filter node) throws Exception { + super.meet(node); + + ValueExpr arg = node.getCondition(); + if (arg instanceof FunctionCall) { + FunctionCall fc = (FunctionCall) arg; + if (SHARDRANGE.stringValue().equals(fc.getURI())) { + List<ValueExpr> valueExprs = fc.getArgs(); + if (valueExprs.size() != 3) { + throw new QueryEvaluationException("mvm:shardRange must have 3 parameters: subject to run time index on, startTime(ms), endTime(ms)"); + } + ValueExpr subj = valueExprs.get(0); + String subj_s = null; + if (subj instanceof Var) { + subj_s = ((Var) subj).getName(); + } else if (subj instanceof ValueConstant) { + subj_s = ((ValueConstant) subj).getValue().stringValue(); + } + if (subj_s == null) + return; //no changes, need to figure out what shard lookup to add this time predicate to + + String startTime = ((ValueConstant) valueExprs.get(1)).getValue().stringValue(); + String endTime = ((ValueConstant) valueExprs.get(2)).getValue().stringValue(); + + this.conf.set(subj_s + "." + SHARDRANGE_BINDING, "true"); + this.conf.set(subj_s + "." + SHARDRANGE_START, startTime); + this.conf.set(subj_s + "." + SHARDRANGE_END, endTime); + + node.setCondition(new ValueConstant(BooleanLiteralImpl.TRUE)); + } + if (TIMERANGE.stringValue().equals(fc.getURI())) { + List<ValueExpr> valueExprs = fc.getArgs(); + if (valueExprs.size() != 4 && valueExprs.size() != 5) { + throw new QueryEvaluationException("mvm:timeRange must have 4/5 parameters: subject to run time index on, time uri to index, startTime, endTime, time type(XMLDATETIME, TIMESTAMP)"); + } + + ValueExpr subj = valueExprs.get(0); + String subj_s = null; + if (subj instanceof Var) { + subj_s = ((Var) subj).getName(); + } else if (subj instanceof ValueConstant) { + subj_s = ((ValueConstant) subj).getValue().stringValue(); + } + if (subj_s == null) + return; //no changes, need to figure out what shard lookup to add this time predicate to + + ValueConstant timeUri_s = (ValueConstant) valueExprs.get(1); + URIImpl timeUri = new URIImpl(timeUri_s.getValue().stringValue()); + String startTime = ((ValueConstant) valueExprs.get(2)).getValue().stringValue(); + String endTime = ((ValueConstant) valueExprs.get(3)).getValue().stringValue(); + TimeType timeType = TimeType.XMLDATETIME; + if (valueExprs.size() > 4) + timeType = TimeType.valueOf(((ValueConstant) valueExprs.get(4)).getValue().stringValue()); + + + this.conf.set(subj_s + "." + TIME_PREDICATE, timeUri.stringValue()); + this.conf.set(subj_s + "." + START_BINDING, startTime); + this.conf.set(subj_s + "." + END_BINDING, endTime); + this.conf.set(subj_s + "." + TIME_TYPE_PROP, timeType.name()); + + //not setting global times + //set global start-end times +// String startTime_global = conf.get(START_BINDING); +// String endTime_global = conf.get(END_BINDING); +// if (startTime_global != null) { +// long startTime_l = Long.parseLong(startTime); +// long startTime_lg = Long.parseLong(startTime_global); +// if (startTime_l < startTime_lg) +// conf.set(START_BINDING, startTime); +// } else +// conf.set(START_BINDING, startTime); +// +// if (endTime_global != null) { +// long endTime_l = Long.parseLong(endTime); +// long endTime_lg = Long.parseLong(endTime_global); +// if (endTime_l > endTime_lg) +// conf.set(END_BINDING, endTime); +// } else +// conf.set(END_BINDING, endTime); + + node.setCondition(new ValueConstant(BooleanLiteralImpl.TRUE)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java new file mode 100644 index 0000000..bf898ff --- /dev/null +++ b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/query/evaluation/PartitionEvaluationStrategy.java @@ -0,0 +1,70 @@ +package mvm.mmrts.rdf.partition.query.evaluation; + +import cloudbase.core.client.Connector; +import info.aduna.iteration.CloseableIteration; +import mvm.mmrts.rdf.partition.PartitionTripleSource; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.TripleSource; +import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; + +import java.util.Map; + +/** + * Class PartitionEvaluationStrategy + * Date: Jul 14, 2011 + * Time: 4:10:03 PM + */ +public class PartitionEvaluationStrategy extends EvaluationStrategyImpl { + + public PartitionEvaluationStrategy(PartitionTripleSource tripleSource, Dataset dataset) { + super(tripleSource, dataset); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(TupleExpr expr, BindingSet bindings) throws QueryEvaluationException { + if (expr instanceof QueryRoot) { + System.out.println(expr); + } else if (expr instanceof ShardSubjectLookup) { + return this.evaluate((ShardSubjectLookup) expr, bindings); + } + return super.evaluate(expr, bindings); + } + + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(ShardSubjectLookup lookup, BindingSet bindings) throws QueryEvaluationException { + if (bindings.size() > 0) { + Var subjVar = lookup.getSubject(); + if(bindings.hasBinding(subjVar.getName())){ + subjVar.setValue(bindings.getValue(subjVar.getName())); + } + //populate the lookup + for (Map.Entry<Var, Var> predObj : lookup.getPredicateObjectPairs()) { + Var predVar = predObj.getKey(); + Var objVar = predObj.getValue(); + + if(bindings.hasBinding(predVar.getName())) { + predVar.setValue(bindings.getValue(predVar.getName())); + } + if(bindings.hasBinding(objVar.getName())) { + objVar.setValue(bindings.getValue(objVar.getName())); + } + } + } + return ((PartitionTripleSource) tripleSource).getStatements(lookup, bindings, new Resource[0]); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(StatementPattern sp, BindingSet bindings) throws QueryEvaluationException { + ShardSubjectLookup lookup = new ShardSubjectLookup(sp.getSubjectVar()); + lookup.addPredicateObjectPair(sp.getPredicateVar(), sp.getObjectVar()); + return this.evaluate((ShardSubjectLookup) lookup, bindings); + } +}
