http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java deleted file mode 100644 index fea882d..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java +++ /dev/null @@ -1,326 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput.bulk; - -import cloudbase.core.client.Connector; -import cloudbase.core.client.Instance; -import cloudbase.core.client.ZooKeeperInstance; -import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; -import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.util.TextUtil; -import com.google.common.base.Preconditions; -import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; -import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.rio.RDFHandler; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.ntriples.NTriplesParserFactory; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.StringReader; -import java.util.Collection; - -import static mvm.mmrts.rdf.partition.PartitionConstants.*; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; - -/** - * Take large ntrips files and use MapReduce and Cloudbase - * Bulk ingest techniques to load into the table in our partition format. - * <p/> - * Input: NTrips file - * Map: - * - key : shard row - Text - * - value : stmt in doc triple format - Text - * Partitioner: RangePartitioner - * Reduce: - * - key : all the entries for each triple - Cloudbase Key - * Class BulkNtripsInputTool - * Date: Sep 13, 2011 - * Time: 10:00:17 AM - */ -public class BulkNtripsInputTool extends Configured implements Tool { - - private static DateHashModShardValueGenerator generator = new DateHashModShardValueGenerator(); - public static final String BASE_MOD = "baseMod"; - - @Override - public int run(String[] args) throws Exception { - Preconditions.checkArgument(args.length >= 7, "Usage: hadoop jar jarfile BulkNtripsInputTool <cb instance>" + - " <zookeepers> <username> <password> <output table> <hdfs ntrips dir> <work dir> (<shard size>)"); - - Configuration conf = getConf(); - PrintStream out = null; - try { - Job job = new Job(conf, "Bulk Ingest NTrips to Partition RDF"); - job.setJarByClass(this.getClass()); - - //setting long job - job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); - job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); - job.getConfiguration().set("io.sort.mb", "256"); - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(ParseNtripsMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - - job.setCombinerClass(OutStmtMutationsReducer.class); - job.setReducerClass(OutStmtMutationsReducer.class); - job.setOutputFormatClass(CloudbaseFileOutputFormat.class); - CloudbaseFileOutputFormat.setZooKeeperInstance(job, args[0], args[1]); - - Instance instance = new ZooKeeperInstance(args[0], args[1]); - String user = args[2]; - byte[] pass = args[3].getBytes(); - String tableName = args[4]; - String inputDir = args[5]; - String workDir = args[6]; - if(args.length > 7) { - int baseMod = Integer.parseInt(args[7]); - generator.setBaseMod(baseMod); - job.getConfiguration().setInt(BASE_MOD, baseMod); - } - - Connector connector = instance.getConnector(user, pass); - - TextInputFormat.setInputPaths(job, new Path(inputDir)); - - FileSystem fs = FileSystem.get(conf); - Path workPath = new Path(workDir); - if (fs.exists(workPath)) - fs.delete(workPath, true); - - CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); - - out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); - - Collection<Text> splits = connector.tableOperations().getSplits(tableName, Integer.MAX_VALUE); - for (Text split : splits) - out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); - - job.setNumReduceTasks(splits.size() + 1); - out.close(); - - job.setPartitionerClass(KeyRangePartitioner.class); - RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); - - job.waitForCompletion(true); - - connector.tableOperations().importDirectory( - tableName, - workDir + "/files", - workDir + "/failures", - 20, - 4, - false); - - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if (out != null) - out.close(); - } - - return 0; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * input: ntrips format triple - * <p/> - * output: key: shard row from generator - * value: stmt in serialized format (document format) - */ - public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { - private static final NTriplesParserFactory N_TRIPLES_PARSER_FACTORY = new NTriplesParserFactory(); - - private Text outputKey = new Text(); - private Text outputValue = new Text(); - private RDFParser parser; - private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - generator.setBaseMod(conf.getInt(BASE_MOD, generator.getBaseMod())); - parser = N_TRIPLES_PARSER_FACTORY.getParser(); - parser.setRDFHandler(new RDFHandler() { - - @Override - public void startRDF() throws RDFHandlerException { - - } - - @Override - public void endRDF() throws RDFHandlerException { - - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { -// byte[] doc_serialized = writeStatement(statement, true); - Text shard = new Text(generator.generateShardValue(statement.getSubject())); - - context.write(new Key(shard, DOC, new Text(writeStatement(statement, true))), EMPTY_VALUE); - context.write(new Key(shard, INDEX, new Text(writeStatement(statement, false))), EMPTY_VALUE); - //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once - context.write(new Key(new Text(writeValue(statement.getSubject())), shard, EMPTY_TXT), EMPTY_VALUE); - -// outputKey.set(key); -// outputValue.set(doc_serialized); -// context.write(outputKey, outputValue); -// outputKey.set(writeValue(statement.getSubject())); -// outputValue.set(EMPTY_BYTE_ARRAY); -// context.write(outputKey, outputValue); - } catch (Exception e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - - } - }); - } - - @Override - public void map(LongWritable key, Text value, Context output) - throws IOException, InterruptedException { - try { - parser.parse(new StringReader(value.toString()), ""); - } catch (Exception e) { - throw new IOException("Exception occurred parsing ntrips triple[" + value + "]"); - } - } - } - - public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { - - public void reduce(Key key, Iterable<Value> values, Context output) - throws IOException, InterruptedException { - output.write(key, EMPTY_VALUE); -// System.out.println(key); -// for (Value value : values) { -// System.out.println(value); - /** - * Each of these is a triple. - * 1. format back to statement - * 2. Output the doc,index key,value pairs for each triple - */ -// Statement stmt = readStatement(ByteStreams.newDataInput(value.getBytes()), VALUE_FACTORY); -// output.write(new Key(shardKey, DOC, new Text(writeStatement(stmt, true))), EMPTY_VALUE); -// output.write(new Key(shardKey, INDEX, new Text(writeStatement(stmt, false))), EMPTY_VALUE); -// //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once -// output.write(new Key(new Text(writeValue(stmt.getSubject())), shardKey, EMPTY_TXT), EMPTY_VALUE); -// } - } - } - - public static class EmbedKeyGroupingComparator implements RawComparator<Text> { - - public EmbedKeyGroupingComparator() { - - } - - @Override - public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, - int arg5) { - DataInputBuffer n = new DataInputBuffer(); - - Text temp1 = new Text(); - Text temp2 = new Text(); - - try { - n.reset(arg0, arg1, arg2); - temp1.readFields(n); - n.reset(arg3, arg4, arg5); - temp2.readFields(n); - } catch (IOException e) { - // TODO Auto-generated catch block - //e.printStackTrace(); - throw new RuntimeException(e); - } - - return compare(temp1, temp2); - } - - @Override - public int compare(Text a1, Text a2) { - return EmbedKeyRangePartitioner.retrieveEmbedKey(a1).compareTo(EmbedKeyRangePartitioner.retrieveEmbedKey(a2)); - } - - } - - /** - * Really it does a normal Text compare - */ - public static class EmbedKeySortComparator implements RawComparator<Text> { - - public EmbedKeySortComparator() { - - } - - @Override - public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, - int arg5) { - DataInputBuffer n = new DataInputBuffer(); - - Text temp1 = new Text(); - Text temp2 = new Text(); - - try { - n.reset(arg0, arg1, arg2); - temp1.readFields(n); - n.reset(arg3, arg4, arg5); - temp2.readFields(n); - } catch (IOException e) { - // TODO Auto-generated catch block - //e.printStackTrace(); - throw new RuntimeException(e); - } - - return compare(temp1, temp2); - } - - @Override - public int compare(Text a1, Text a2) { - return a1.compareTo(a2); - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java deleted file mode 100644 index f72c382..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java +++ /dev/null @@ -1,28 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput.bulk; - -import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; -import mvm.mmrts.rdf.partition.PartitionConstants; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * Class EmbedKeyRangePartitioner - * Date: Sep 13, 2011 - * Time: 1:49:35 PM - */ -public class EmbedKeyRangePartitioner extends RangePartitioner { - @Override - public int getPartition(Text key, Writable value, int numPartitions) { - Text embedKey = retrieveEmbedKey(key); - return super.getPartition(embedKey, value, numPartitions); - } - - public static Text retrieveEmbedKey(Text key) { - int split = key.find(PartitionConstants.INDEX_DELIM_STR); - if (split < 0) - return key; - Text newText = new Text(); - newText.append(key.getBytes(), 0, split); - return newText; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java deleted file mode 100644 index a83d594..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java +++ /dev/null @@ -1,45 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.iterators; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; -import cloudbase.core.util.TextUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import ss.cloudbase.core.iterators.SortedRangeIterator; - -import java.io.IOException; -import java.util.Map; - -/** - * Class SortedEncodedRangeIterator - * Date: Sep 8, 2011 - * Time: 6:01:28 PM - */ -public class SortedEncodedRangeIterator extends SortedRangeIterator { - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - if (options.containsKey(OPTION_LOWER_BOUND)) { - lower = new Text(decode(options.get(OPTION_LOWER_BOUND))); - } else { - lower = new Text("\u0000"); - } - - if (options.containsKey(OPTION_UPPER_BOUND)) { - upper = new Text(decode(options.get(OPTION_UPPER_BOUND))); - } else { - upper = new Text("\u0000"); - } - } - - public static String encode(String txt) { - return new String(Base64.encodeBase64(txt.getBytes())); - } - - public static String decode(String txt) { - return new String(Base64.decodeBase64(txt.getBytes())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java deleted file mode 100644 index e360ca7..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java +++ /dev/null @@ -1,31 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.Reducer; - -import java.io.IOException; -import java.util.Map; - -/** - * Since each subject is located at most on one tablet, we should be able to assume that - * no reducer is needed. The Combine phase should aggregate properly. - * <p/> - * Class AggregateTriplesBySubjectReducer - * Date: Sep 1, 2011 - * Time: 5:39:24 PM - */ -public class AggregateTriplesBySubjectCombiner extends Reducer<Text, MapWritable, Text, MapWritable> { -// private LongWritable lwout = new LongWritable(); - private MapWritable mwout = new MapWritable(); - - @Override - protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { - for (MapWritable value : values) { - for (Map.Entry<Writable, Writable> entry : value.entrySet()) { - mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()), - WritableUtils.clone(entry.getValue(), context.getConfiguration())); - } - } - context.write(key, mwout); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java deleted file mode 100644 index 2ea5fa8..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java +++ /dev/null @@ -1,37 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.Reducer; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.SELECT_FILTER; - -/** - * Since each subject is located at most on one tablet, we should be able to assume that - * no reducer is needed. The Combine phase should aggregate properly. - * <p/> - * Class AggregateTriplesBySubjectReducer - * Date: Sep 1, 2011 - * Time: 5:39:24 PM - */ -public class AggregateTriplesBySubjectReducer extends Reducer<Text, MapWritable, LongWritable, MapWritable> { - private LongWritable lwout = new LongWritable(); - private MapWritable mwout = new MapWritable(); - - @Override - protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { - for (MapWritable value : values) { - for (Map.Entry<Writable, Writable> entry : value.entrySet()) { - mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()), - WritableUtils.clone(entry.getValue(), context.getConfiguration())); - } - } - lwout.set(key.hashCode()); - context.write(lwout, mwout); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java deleted file mode 100644 index 0630501..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java +++ /dev/null @@ -1,78 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.io.ByteStreams; -import mvm.mmrts.rdf.partition.PartitionConstants; -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.openrdf.model.Statement; - -import java.io.IOException; -import java.util.*; - -import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*; - -/** - * Will take a triple and output: <subject, predObj map> - * <p/> - * Class KeyValueToMapWrMapper - * Date: Sep 1, 2011 - * Time: 4:56:42 PM - */ -public class KeyValueToMapWrMapper extends Mapper<Key, Value, Text, MapWritable> { - -// private List<String> predicateFilter = new ArrayList<String>(); - - private Text subjNameTxt; - private Text keyout = new Text(); - private Text predout = new Text(); - private Text objout = new Text(); - - private Map<String, String> predValueName = new HashMap(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - //find the values to filter on - Configuration conf = context.getConfiguration(); - String[] filter = conf.getStrings(SELECT_FILTER); - if (filter != null) { - for (String predValue : filter) { - String predName = conf.get(predValue); - if (predName != null) - predValueName.put(predValue, predName); - } - } - - String subjName = conf.get(SUBJECT_NAME); - if (subjName != null) { - //not sure it will ever be null - subjNameTxt = new Text(subjName); - } - } - - @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), PartitionConstants.VALUE_FACTORY); - String predName = predValueName.get(stmt.getPredicate().stringValue()); - if (predName == null) - return; - - keyout.set(stmt.getSubject().stringValue()); - predout.set(predName); - objout.set(stmt.getObject().stringValue()); - MapWritable mw = new MapWritable(); - mw.put(predout, objout); - if (subjNameTxt != null) { - mw.put(subjNameTxt, keyout); - } - context.write(keyout, mw); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java deleted file mode 100644 index 56014f9..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java +++ /dev/null @@ -1,118 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -import cloudbase.core.util.ArgumentChecker; -import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor; -import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer; -import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.QueryModelNode; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParserFactory; - -/** - * Class SparqlCloudbaseIFJob - * Date: Sep 1, 2011 - * Time: 6:04:35 PM - */ -public class SparqlCloudbaseIFJob { - - private String[] queries; - private String table; - - //Cloudbase properties - private String userName; - private String pwd; - private String instance; - private String zk; - // - - private Class classOriginal; //Calling class for this job. - private String outputPath; - - public SparqlCloudbaseIFJob(String table, String userName, String pwd, String instance, String zk, - String outputPath, Class classOriginal, String... queries) { - ArgumentChecker.notNull(queries); - this.queries = queries; - this.table = table; - this.userName = userName; - this.pwd = pwd; - this.instance = instance; - this.zk = zk; - this.outputPath = outputPath; - this.classOriginal = classOriginal; - } - - public String[] run() throws Exception { - int count = 0; - outputPath = outputPath + "/results/"; - String[] resultsOut = new String[queries.length]; - - for (String query : queries) { - QueryParser parser = (new SPARQLParserFactory()).getParser(); - TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr(); - - final Configuration queryConf = new Configuration(); - expr.visit(new FilterTimeIndexVisitor(queryConf)); - - (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null); - - //make sure of only one shardlookup - expr.visit(new QueryModelVisitorBase<RuntimeException>() { - int count = 0; - - @Override - public void meetOther(QueryModelNode node) throws RuntimeException { - super.meetOther(node); - count++; - if (count > 1) - throw new IllegalArgumentException("Query can only have one subject-star lookup"); - } - }); - - final Job job = new Job(queryConf); - job.setJarByClass(classOriginal); - job.setJobName("SparqlCloudbaseIFTransformer. Query: " + ((query.length() > 32) ? (query.substring(0, 32)) : (query))); - - expr.visit(new QueryModelVisitorBase<RuntimeException>() { - @Override - public void meetOther(QueryModelNode node) throws RuntimeException { - super.meetOther(node); - - //set up CloudbaseBatchScannerInputFormat here - if (node instanceof ShardSubjectLookup) { - System.out.println("Lookup: " + node); - try { - new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, table, - userName, pwd, instance, zk); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } - } - } - }); - - - String resultOutPath = outputPath + "/result-" + count; - resultsOut[count] = resultOutPath; - Path outputDir = new Path(resultOutPath); - FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf); - if (dfs.exists(outputDir)) - dfs.delete(outputDir, true); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - SequenceFileOutputFormat.setOutputPath(job, outputDir); - - - // Submit the job - job.waitForCompletion(true); - count++; - } - return resultsOut; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java deleted file mode 100644 index 38c9ea5..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java +++ /dev/null @@ -1,331 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.TableNotFoundException; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import mvm.rya.cloudbase.utils.input.CloudbaseBatchScannerInputFormat; -import mvm.mmrts.rdf.partition.mr.iterators.SortedEncodedRangeIterator; -import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; -import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.Var; -import ss.cloudbase.core.iterators.GMDenIntersectingIterator; -import ss.cloudbase.core.iterators.SortedRangeIterator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static mvm.mmrts.rdf.partition.PartitionConstants.*; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; - -import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*; - -/** - * Class SparqlCloudbaseIFTransformer - * Date: Sep 1, 2011 - * Time: 11:28:48 AM - */ -public class SparqlCloudbaseIFTransformer { - - protected Job job; - - protected String userName; - protected String pwd; - protected String instance; - protected String zk; - - protected ShardSubjectLookup lookup; -// protected Configuration configuration; - protected String table; - - protected DateHashModShardValueGenerator generator; - - public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table, - String userName, String pwd, String instance, String zk) throws QueryEvaluationException { - this(lookup, configuration, job, table, userName, pwd, instance, zk, new DateHashModShardValueGenerator()); - } - - public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table, - String userName, String pwd, String instance, String zk, DateHashModShardValueGenerator generator) throws QueryEvaluationException { - this.lookup = lookup; -// this.configuration = configuration; - this.table = table; - this.job = job; - this.userName = userName; - this.pwd = pwd; - this.instance = instance; - this.zk = zk; - this.generator = generator; - - this.initialize(); - } - - - public void initialize() throws QueryEvaluationException { - try { - /** - * Here we will set up the BatchScanner based on the lookup - */ - Var subject = lookup.getSubject(); - List<Map.Entry<Var, Var>> where = retrieveWhereClause(); - List<Map.Entry<Var, Var>> select = retrieveSelectClause(); - - //global start-end time - long start = job.getConfiguration().getLong(START_BINDING, 0); - long end = job.getConfiguration().getLong(END_BINDING, System.currentTimeMillis()); - - int whereSize = where.size() + ((!isTimeRange(lookup, job.getConfiguration())) ? 0 : 1); - - if (subject.hasValue() - && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */ - && select.size() == 0) { - /** - * Case 1: Subject is set, but predicate, object are not. - * Return all for the subject - */ -// this.scanner = scannerForSubject((URI) subject.getValue()); -// if (this.scanner == null) { -// this.iter = new EmptyIteration(); -// return; -// } -// Map.Entry<Var, Var> predObj = lookup.getPredicateObjectPairs().get(0); -// this.iter = new SelectAllIterator(this.bindings, this.scanner.iterator(), predObj.getKey(), predObj.getValue()); - throw new UnsupportedOperationException("Query Case not supported"); - } else if (subject.hasValue() - && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { - /** - * Case 2: Subject is set, and a few predicates are set, but no objects - * Return all, and filter which predicates you are interested in - */ -// this.scanner = scannerForSubject((URI) subject.getValue()); -// if (this.scanner == null) { -// this.iter = new EmptyIteration(); -// return; -// } -// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); - throw new UnsupportedOperationException("Query Case not supported"); - } else if (subject.hasValue() - && where.size() >= 1 /* Not using whereSize, because we can set up the TimeRange in the scanner */) { - /** - * Case 2a: Subject is set, and a few predicates are set, and one object - * TODO: For now we will ignore the predicate-object filter because we do not know how to query for this - */ -// this.scanner = scannerForSubject((URI) subject.getValue()); -// if (this.scanner == null) { -// this.iter = new EmptyIteration(); -// return; -// } -// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); - throw new UnsupportedOperationException("Query Case not supported"); - } else if (!subject.hasValue() && whereSize > 1) { - /** - * Case 3: Subject is not set, more than one where clause - */ - scannerForPredicateObject(lookup, start, end, where); - setSelectFilter(subject, select); - } else if (!subject.hasValue() && whereSize == 1) { - /** - * Case 4: No subject, only one where clause - */ - Map.Entry<Var, Var> predObj = null; - if (where.size() == 1) { - predObj = where.get(0); - } - scannerForPredicateObject(lookup, start, end, predObj); - setSelectFilter(subject, select); - } else if (!subject.hasValue() && whereSize == 0 && select.size() > 1) { - /** - * Case 5: No subject, no where (just 1 select) - */ -// this.scanner = scannerForPredicates(start, end, select); -// if (this.scanner == null) { -// this.iter = new EmptyIteration(); -// return; -// } -// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select); - throw new UnsupportedOperationException("Query Case not supported"); - } else if (!subject.hasValue() && whereSize == 0 && select.size() == 1) { - /** - * Case 5: No subject, no where (just 1 select) - */ -// cloudbase.core.client.Scanner sc = scannerForPredicate(start, end, (URI) select.get(0).getKey().getValue()); -// if (sc == null) { -// this.iter = new EmptyIteration(); -// return; -// } -// this.iter = new FilterIterator(this.bindings, sc.iterator(), subject, select); - throw new UnsupportedOperationException("Query Case not supported"); - } else { - throw new QueryEvaluationException("Case not supported as of yet"); - } - - } catch (Exception e) { - throw new QueryEvaluationException(e); - } - } - - protected void setSelectFilter(Var subj, List<Map.Entry<Var, Var>> select) { - List<String> selectStrs = new ArrayList<String>(); - for (Map.Entry<Var, Var> entry : select) { - Var key = entry.getKey(); - Var obj = entry.getValue(); - if (key.hasValue()) { - String pred_s = key.getValue().stringValue(); - selectStrs.add(pred_s); - job.getConfiguration().set(pred_s, obj.getName()); - } - } - job.getConfiguration().setStrings(SELECT_FILTER, selectStrs.toArray(new String[selectStrs.size()])); - job.getConfiguration().set(SUBJECT_NAME, subj.getName()); - } - - protected List<Map.Entry<Var, Var>> retrieveWhereClause() { - List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>(); - for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { - Var pred = entry.getKey(); - Var object = entry.getValue(); - if (pred.hasValue() && object.hasValue()) { - where.add(entry); //TODO: maybe we should clone this? - } - } - return where; - } - - protected List<Map.Entry<Var, Var>> retrieveSelectClause() { - List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, Var>>(); - for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) { - Var pred = entry.getKey(); - Var object = entry.getValue(); - if (pred.hasValue() && !object.hasValue()) { - select.add(entry); //TODO: maybe we should clone this? - } - } - return select; - } - - protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, List<Map.Entry<Var, Var>> predObjs) throws IOException, TableNotFoundException { - start = validateFillStartTime(start, lookup); - end = validateFillEndTime(end, lookup); - - int extra = 0; - - if (isTimeRange(lookup, job.getConfiguration())) { - extra += 1; - } - - Text[] queries = new Text[predObjs.size() + extra]; - for (int i = 0; i < predObjs.size(); i++) { - Map.Entry<Var, Var> predObj = predObjs.get(i); - ByteArrayDataOutput output = ByteStreams.newDataOutput(); - writeValue(output, predObj.getKey().getValue()); - output.write(INDEX_DELIM); - writeValue(output, predObj.getValue().getValue()); - queries[i] = new Text(output.toByteArray()); - } - - if (isTimeRange(lookup, job.getConfiguration())) { - queries[queries.length - 1] = new Text( - GMDenIntersectingIterator.getRangeTerm(INDEX.toString(), - getStartTimeRange(lookup, job.getConfiguration()) - , true, - getEndTimeRange(lookup, job.getConfiguration()), - true - ) - ); - } - - createBatchScannerInputFormat(); - CloudbaseBatchScannerInputFormat.setIterator(job, 20, GMDenIntersectingIterator.class.getName(), "ii"); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.docFamilyOptionName, DOC.toString()); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString()); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(queries)); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true); - - Range range = new Range( - new Key(new Text(generator.generateShardValue(start, null) + "\0")), - new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) - ); - CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton( - range - )); - } - - protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, Map.Entry<Var, Var> predObj) throws IOException, TableNotFoundException { - start = validateFillStartTime(start, lookup); - end = validateFillEndTime(end, lookup); - - /** - * Need to use GMDen because SortedRange can't serialize non xml characters in range - * @see https://issues.apache.org/jira/browse/MAPREDUCE-109 - */ - createBatchScannerInputFormat(); - CloudbaseBatchScannerInputFormat.setIterator(job, 20, SortedEncodedRangeIterator.class.getName(), "ri"); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_DOC_COLF, DOC.toString()); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_COLF, INDEX.toString()); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_MULTI_DOC, "" + true); - - String lower, upper = null; - if (isTimeRange(lookup, job.getConfiguration())) { - lower = getStartTimeRange(lookup, job.getConfiguration()); - upper = getEndTimeRange(lookup, job.getConfiguration()); - } else { - - ByteArrayDataOutput output = ByteStreams.newDataOutput(); - writeValue(output, predObj.getKey().getValue()); - output.write(INDEX_DELIM); - writeValue(output, predObj.getValue().getValue()); - - lower = new String(output.toByteArray()); - upper = lower + "\01"; - } - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_LOWER_BOUND, SortedEncodedRangeIterator.encode(lower)); - CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_UPPER_BOUND, SortedEncodedRangeIterator.encode(upper)); - - //TODO: Do we add a time predicate to this? -// bs.setScanIterators(19, FilteringIterator.class.getName(), "filteringIterator"); -// bs.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); -// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, (end - start) + ""); -// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, end + ""); - - Range range = new Range( - new Key(new Text(generator.generateShardValue(start, null) + "\0")), - new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD")) - ); - CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton( - range - )); - - } - - protected void createBatchScannerInputFormat() { - job.setInputFormatClass(CloudbaseBatchScannerInputFormat.class); - CloudbaseBatchScannerInputFormat.setInputInfo(job, userName, pwd.getBytes(), table, CBConstants.NO_AUTHS); //may need to change these auths sometime soon - CloudbaseBatchScannerInputFormat.setZooKeeperInstance(job, instance, zk); - job.setMapperClass(KeyValueToMapWrMapper.class); - job.setCombinerClass(AggregateTriplesBySubjectCombiner.class); - job.setReducerClass(AggregateTriplesBySubjectReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(MapWritable.class); - job.setOutputKeyClass(LongWritable.class); - job.setOutputValueClass(MapWritable.class); - - job.getConfiguration().set("io.sort.mb", "256"); - job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); - job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java deleted file mode 100644 index 84f83c0..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java +++ /dev/null @@ -1,12 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.transform; - -/** - * Class SparqlCloudbaseIFTransformerConstants - * Date: Sep 1, 2011 - * Time: 5:01:10 PM - */ -public class SparqlCloudbaseIFTransformerConstants { - public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.transform."; - public static final String SELECT_FILTER = PREFIX + "select"; - public static final String SUBJECT_NAME = PREFIX + "subject"; -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java deleted file mode 100644 index effb9ff..0000000 --- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.compat; - -import junit.framework.TestCase; - -/** - * Class ChangeShardDateFormatToolTest - * Date: Dec 9, 2011 - * Time: 10:39:31 AM - */ -public class ChangeShardDateFormatToolTest extends TestCase { - - public void testShardDelim() throws Exception { - String dateDelim = "-"; - String shard = "2011-11-01"; - int shardIndex = shard.lastIndexOf(dateDelim); - if (shardIndex == -1) - fail(); - String date = shard.substring(0, shardIndex); - shard = shard.substring(shardIndex + 1, shard.length()); - assertEquals("2011-11", date); - assertEquals("01", shard); - - dateDelim = "_"; - shard = "20111101_33"; - shardIndex = shard.lastIndexOf(dateDelim); - if (shardIndex == -1) - fail(); - date = shard.substring(0, shardIndex); - shard = shard.substring(shardIndex + 1, shard.length()); - assertEquals("20111101", date); - assertEquals("33", shard); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java deleted file mode 100644 index c279348..0000000 --- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java +++ /dev/null @@ -1,80 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput; - -import cloudbase.core.client.Connector; -import cloudbase.core.client.ZooKeeperInstance; -import cloudbase.core.data.ColumnUpdate; -import cloudbase.core.data.Mutation; -import junit.framework.TestCase; -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; -import org.apache.hadoop.mrunit.types.Pair; -import org.apache.zookeeper.ZooKeeper; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.StatementImpl; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.util.Collection; -import java.util.List; - -/** - * Class RdfFileInputToolTest - * Date: Aug 8, 2011 - * Time: 3:22:25 PM - */ -public class RdfFileInputToolTest extends TestCase { - - ValueFactory vf = ValueFactoryImpl.getInstance(); - - /** - * MRUnit for latest mapreduce (0.21 api) - * <p/> - * 1. Test to see if the bytes overwrite will affect - */ - - private Mapper<LongWritable, BytesWritable, Text, BytesWritable> mapper = new RdfFileInputToCloudbaseTool.OutSubjStmtMapper(); - private Reducer<Text, BytesWritable, Text, Mutation> reducer = new RdfFileInputToCloudbaseTool.StatementToMutationReducer(); - private MapReduceDriver<LongWritable, BytesWritable, Text, BytesWritable, Text, Mutation> driver; - - @Override - protected void setUp() throws Exception { - super.setUp(); - driver = new MapReduceDriver(mapper, reducer); - Configuration conf = new Configuration(); - conf.set(RdfFileInputToCloudbaseTool.CB_TABLE_PROP, "table"); - driver.setConfiguration(conf); - } - - public void testNormalRun() throws Exception { - StatementImpl stmt1 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("object")); - StatementImpl stmt2 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("obje")); - StatementImpl stmt3 = new StatementImpl(vf.createURI("urn:namespace#subj2"), vf.createURI("urn:namespace#pred"), vf.createLiteral("ob")); - List<Pair<Text, Mutation>> pairs = driver. - withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt1, true))). - withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt2, true))). - withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt3, true))). - run(); - - assertEquals(4, pairs.size()); - - ColumnUpdate update = pairs.get(0).getSecond().getUpdates().get(0); - assertEquals("event", new String(update.getColumnFamily())); - assertEquals("\07urn:namespace#subj2\0\07urn:namespace#pred\0\u0009ob", new String(update.getColumnQualifier())); - } - - public static void main(String[] args) { - try { - Connector connector = new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()); - Collection<Text> splits = connector.tableOperations().getSplits("partitionRdf", Integer.MAX_VALUE); - System.out.println(splits.size()); - System.out.println(splits); - } catch (Exception e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java deleted file mode 100644 index bd63f6f..0000000 --- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput.bulk; - -import junit.framework.TestCase; -import org.apache.hadoop.io.Text; - -/** - * Class EmbedKeyRangePartitionerTest - * Date: Sep 13, 2011 - * Time: 1:58:28 PM - */ -public class EmbedKeyRangePartitionerTest extends TestCase { - - public void testRetrieveEmbedKey() throws Exception { - assertEquals(new Text("hello"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello\1there"))); - assertEquals(new Text("h"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("h\1there"))); - assertEquals(new Text(""), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("\1there"))); - assertEquals(new Text("hello there"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello there"))); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/pom.xml ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/pom.xml b/partition/partition.rdf/pom.xml deleted file mode 100644 index 2701d64..0000000 --- a/partition/partition.rdf/pom.xml +++ /dev/null @@ -1,281 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>mvm.rya</groupId> - <artifactId>parent</artifactId> - <version>3.0.0.alpha1</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <groupId>mvm.mmrts.rdf</groupId> - <artifactId>partition.rdf</artifactId> - <version>1.0.0-SNAPSHOT</version> - <name>${project.groupId}.${project.artifactId}</name> - - <dependencies> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-runtime</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryresultio-sparqlxml</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-rdfxml</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>sitestore.common</groupId> - <artifactId>common-query</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>cloudbase.utils</artifactId> - </dependency> - - <!-- Cloudbase deps --> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>zookeeper</artifactId> - </dependency> - - <!-- Test --> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <!-- Deps that are transitive but listed anyway - - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-model</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-query</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryalgebra-model</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryparser-api</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryparser-serql</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryparser-sparql</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryparser-serql</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryresultio-api</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryresultio-binary</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryresultio-sparqljson</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryresultio-text</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-api</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-manager</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-event</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-sail</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-sail-memory</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-sail-inferencer</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryalgebra-evaluation</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-http</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-http-client</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-contextaware</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-repository-dataset</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-http-protocol</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-ntriples</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-api</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-n3</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-trix</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-turtle</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-trig</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-sail-api</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-sail-nativerdf</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-sail-rdbms</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-collections</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-iteration</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-io</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-lang</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-i18n</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-concurrent</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-xml</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-text</artifactId> - </dependency> - <dependency> - <groupId>info.aduna.commons</groupId> - <artifactId>aduna-commons-net</artifactId> - </dependency> - <dependency> - <groupId>commons-dbcp</groupId> - <artifactId>commons-dbcp</artifactId> - </dependency> - <dependency> - <groupId>commons-pool</groupId> - <artifactId>commons-pool</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> --> - - </dependencies> - <repositories> - <repository> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>false</enabled> - </snapshots> - <id>aduna-opensource.releases</id> - <name>Aduna Open Source - Maven releases</name> - <url>http://repo.aduna-software.org/maven2/releases</url> - </repository> - </repositories> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.6</source> - <target>1.6</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <excludes> - <exclude>**/*IntegrationTest.java - </exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java deleted file mode 100644 index 0c723a1..0000000 --- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java +++ /dev/null @@ -1,34 +0,0 @@ -package mvm.mmrts.rdf.partition; - -/** - * Class InvalidValueTypeMarkerRuntimeException - * Date: Jan 7, 2011 - * Time: 12:58:27 PM - */ -public class InvalidValueTypeMarkerRuntimeException extends RuntimeException { - private int valueTypeMarker = -1; - - public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker) { - super(); - this.valueTypeMarker = valueTypeMarker; - } - - public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s) { - super(s); - this.valueTypeMarker = valueTypeMarker; - } - - public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s, Throwable throwable) { - super(s, throwable); - this.valueTypeMarker = valueTypeMarker; - } - - public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, Throwable throwable) { - super(throwable); - this.valueTypeMarker = valueTypeMarker; - } - - public int getValueTypeMarker() { - return valueTypeMarker; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java deleted file mode 100644 index 83e0675..0000000 --- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java +++ /dev/null @@ -1,306 +0,0 @@ -package mvm.mmrts.rdf.partition; - -import cloudbase.core.client.BatchWriter; -import cloudbase.core.client.Connector; -import cloudbase.core.client.Scanner; -import cloudbase.core.client.admin.TableOperations; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.security.ColumnVisibility; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import info.aduna.iteration.CloseableIteration; -import mvm.mmrts.rdf.partition.converter.ContextColVisConverter; -import mvm.mmrts.rdf.partition.iterators.NamespaceIterator; -import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor; -import mvm.mmrts.rdf.partition.query.evaluation.PartitionEvaluationStrategy; -import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer; -import mvm.mmrts.rdf.partition.shard.ShardValueGenerator; -import mvm.mmrts.rdf.partition.utils.ContextsStatementImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.openrdf.model.*; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; -import org.openrdf.query.Dataset; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.QueryRoot; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.impl.EmptyBindingSet; -import org.openrdf.sail.SailException; -import org.openrdf.sail.helpers.SailConnectionBase; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -import static mvm.mmrts.rdf.partition.PartitionConstants.*; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; - -/** - * Class PartitionConnection - * Date: Jul 6, 2011 - * Time: 4:40:49 PM - * <p/> - * Ingest: - * Triple -> - * - <subject> <shard>: - * - <shard> event:<subject>\0<predicate>\0<object> - * - <shard> index:<predicate>\1<object>\0 - * <p/> - * Namespace -> - * - <prefix> ns:<namespace> - */ -public class PartitionConnection extends SailConnectionBase { - - private PartitionSail sail; - private BatchWriter writer; - private BatchWriter shardTableWriter; //MMRTS-148 - - private Multimap<Resource, ContextsStatementImpl> statements = HashMultimap.create(10000, 10); - - - public PartitionConnection(PartitionSail sailBase) throws SailException { - super(sailBase); - this.sail = sailBase; - this.initialize(); - } - - protected void initialize() throws SailException { - try { - Connector connector = sail.connector; - String table = sail.table; - String shardTable = sail.shardTable; - - //create these tables if they do not exist - TableOperations tableOperations = connector.tableOperations(); - boolean tableExists = tableOperations.exists(table); - if (!tableExists) - tableOperations.create(table); - - tableExists = tableOperations.exists(shardTable); - if(!tableExists) - tableOperations.create(shardTable); - - writer = connector.createBatchWriter(table, 1000000l, 60000l, 10); - shardTableWriter = connector.createBatchWriter(shardTable, 1000000l, 60000l, 10); - } catch (Exception e) { - throw new SailException(e); - } - } - - @Override - protected void closeInternal() throws SailException { - try { - writer.close(); - shardTableWriter.close(); - } catch (Exception e) { - throw new SailException(e); - } - } - - @Override - protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(TupleExpr tupleExpr, Dataset dataset, BindingSet bindingSet, boolean b) throws SailException { -// throw new UnsupportedOperationException("Query not supported"); - - if (!(tupleExpr instanceof QueryRoot)) - tupleExpr = new QueryRoot(tupleExpr); - - try { - Configuration queryConf = populateConf(bindingSet); - //timeRange filter check - tupleExpr.visit(new FilterTimeIndexVisitor(queryConf)); - - (new SubjectGroupingOptimizer(queryConf)).optimize(tupleExpr, dataset, bindingSet); - PartitionTripleSource source = new PartitionTripleSource(this.sail, queryConf); - - PartitionEvaluationStrategy strategy = new PartitionEvaluationStrategy( - source, dataset); - - return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance()); - } catch (Exception e) { - throw new SailException(e); - } - - } - - protected Configuration populateConf(BindingSet bs) { - Configuration conf = new Configuration(this.sail.conf); - - for (String bname : bs.getBindingNames()) { - conf.set(bname, bs.getValue(bname).stringValue()); - } - Binding start = bs.getBinding(START_BINDING); - if (start != null) - conf.setLong(START_BINDING, Long.parseLong(start.getValue().stringValue())); - - Binding end = bs.getBinding(END_BINDING); - if (end != null) - conf.setLong(END_BINDING, Long.parseLong(end.getValue().stringValue())); - - Binding timePredicate = bs.getBinding(TIME_PREDICATE); - if (timePredicate != null) - conf.set(TIME_PREDICATE, timePredicate.getValue().stringValue()); - - Binding timeType = bs.getBinding(TIME_TYPE_PROP); - if (timeType != null) - conf.set(TIME_TYPE_PROP, timeType.getValue().stringValue()); - else if (timePredicate != null) - conf.set(TIME_TYPE_PROP, TimeType.XMLDATETIME.name()); //default to xml datetime - - return conf; - } - - @Override - protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException { - throw new UnsupportedOperationException("Contexts not supported"); - } - - @Override - protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(Resource resource, URI uri, Value value, boolean b, Resource... resources) throws SailException { - throw new UnsupportedOperationException("Query not supported"); - } - - @Override - protected long sizeInternal(Resource... resources) throws SailException { - throw new UnsupportedOperationException("Size operation not supported"); - } - - @Override - protected void startTransactionInternal() throws SailException { - // no transaction support as of yet - } - - @Override - protected void commitInternal() throws SailException { - try { - ShardValueGenerator gen = sail.generator; - ContextColVisConverter contextColVisConverter = sail.contextColVisConverter; - Map<Resource, Collection<ContextsStatementImpl>> map = statements.asMap(); - for (Map.Entry<Resource, Collection<ContextsStatementImpl>> entry : map.entrySet()) { - Resource subject = entry.getKey(); - byte[] subj_bytes = writeValue(subject); - String shard = gen.generateShardValue(subject); - Text shard_txt = new Text(shard); - Collection<ContextsStatementImpl> stmts = entry.getValue(); - - /** - * Triple - > - *- < subject ><shard >: - *- < shard > event:<subject >\0 < predicate >\0 < object > - *- < shard > index:<predicate >\1 < object >\0 - */ - Mutation m_subj = new Mutation(shard_txt); - for (ContextsStatementImpl stmt : stmts) { - Resource[] contexts = stmt.getContexts(); - ColumnVisibility vis = null; - if (contexts != null && contexts.length > 0 && contextColVisConverter != null) { - vis = contextColVisConverter.convertContexts(contexts); - } - - if (vis != null) { - m_subj.put(DOC, new Text(writeStatement(stmt, true)), vis, EMPTY_VALUE); - m_subj.put(INDEX, new Text(writeStatement(stmt, false)), vis, EMPTY_VALUE); - } else { - m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE); - m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE); - } - } - - /** - * TODO: Is this right? - * If the subject does not have any authorizations specified, then anyone can access it. - * But the true authorization check will happen at the predicate/object level, which means that - * the set returned will only be what the person is authorized to see. The shard lookup table has to - * have the lowest level authorization all the predicate/object authorizations; otherwise, - * a user may not be able to see the correct document. - */ - Mutation m_shard = new Mutation(new Text(subj_bytes)); - m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE); - shardTableWriter.addMutation(m_shard); - - writer.addMutation(m_subj); - } - - writer.flush(); - shardTableWriter.flush(); - statements.clear(); - } catch (Exception e) { - throw new SailException(e); - } - finally { - } - } - - @Override - protected void rollbackInternal() throws SailException { - statements.clear(); - } - - @Override - protected void addStatementInternal(Resource subject, URI predicate, Value object, Resource... contexts) throws SailException { - statements.put(subject, new ContextsStatementImpl(subject, predicate, object, contexts)); - } - - @Override - protected void removeStatementsInternal(Resource resource, URI uri, Value value, Resource... contexts) throws SailException { - throw new UnsupportedOperationException("Remove not supported as of yet"); - } - - @Override - protected void clearInternal(Resource... resources) throws SailException { - throw new UnsupportedOperationException("Clear with context not supported as of yet"); - } - - @Override - protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal() throws SailException { - return new NamespaceIterator(sail.connector, sail.table); - } - - @Override - protected String getNamespaceInternal(String prefix) throws SailException { - try { - Scanner scanner = sail.connector.createScanner(sail.table, ALL_AUTHORIZATIONS); - scanner.setRange(new Range(new Text(prefix))); - scanner.fetchColumnFamily(NAMESPACE); - Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = scanner.iterator(); - if (iter != null && iter.hasNext()) - return iter.next().getKey().getColumnQualifier().toString(); - } catch (Exception e) { - throw new SailException(e); - } - return null; - } - - @Override - protected void setNamespaceInternal(String prefix, String namespace) throws SailException { - /** - * Namespace -> - * - <prefix> <namespace>: - */ - - try { - Mutation m = new Mutation(new Text(prefix)); - m.put(NAMESPACE, new Text(namespace), EMPTY_VALUE); - writer.addMutation(m); - } catch (Exception e) { - throw new SailException(e); - } - } - - @Override - protected void removeNamespaceInternal - (String - s) throws SailException { - throw new UnsupportedOperationException("Namespace remove not supported"); - } - - @Override - protected void clearNamespacesInternal - () throws SailException { - throw new UnsupportedOperationException("Namespace Clear not supported"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java ---------------------------------------------------------------------- diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java deleted file mode 100644 index cb69596..0000000 --- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java +++ /dev/null @@ -1,141 +0,0 @@ -package mvm.mmrts.rdf.partition; - -import cloudbase.core.CBConstants; -import cloudbase.core.data.Value; -import cloudbase.core.security.Authorizations; -import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Literal; -import org.openrdf.model.URI; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * Class PartitionConstants - * Date: Jul 6, 2011 - * Time: 12:22:55 PM - */ -public class PartitionConstants { - - public static final String PARTITION_NS = "urn:mvm.mmrts.partition.rdf/08/2011#"; - public static ValueFactory VALUE_FACTORY = ValueFactoryImpl.getInstance(); - public static URI TIMERANGE = VALUE_FACTORY.createURI(PARTITION_NS, "timeRange"); - public static URI SHARDRANGE = VALUE_FACTORY.createURI(PARTITION_NS, "shardRange"); //shardRange(subject, start, stop) in ms - public static Literal EMPTY_LITERAL = VALUE_FACTORY.createLiteral(0); - - public static final byte FAMILY_DELIM = 0; - public static final String FAMILY_DELIM_STR = "\0"; - public static final byte INDEX_DELIM = 1; - public static final String INDEX_DELIM_STR = "\1"; - - /* RECORD TYPES */ -// public static final int NAMESPACE_MARKER = 2; -// -// public static final int EXPL_TRIPLE_MARKER = 3; -// -// public static final int EXPL_QUAD_MARKER = 4; -// -// public static final int INF_TRIPLE_MARKER = 5; -// -// public static final int INF_QUAD_MARKER = 6; - - public static final int URI_MARKER = 7; - - public static final String URI_MARKER_STR = "\07"; - - public static final int BNODE_MARKER = 8; - - public static final int PLAIN_LITERAL_MARKER = 9; - - public static final String PLAIN_LITERAL_MARKER_STR = "\u0009"; - - public static final int LANG_LITERAL_MARKER = 10; - - public static final int DATATYPE_LITERAL_MARKER = 11; - - public static final String DATATYPE_LITERAL_MARKER_STR = "\u000B"; - - public static final int EOF_MARKER = 127; - - // public static final Authorizations ALL_AUTHORIZATIONS = new Authorizations( - // "_"); - public static final Authorizations ALL_AUTHORIZATIONS = CBConstants.NO_AUTHS; - - public static final Value EMPTY_VALUE = new Value(new byte[0]); - public static final Text EMPTY_TXT = new Text(""); - - /* Column Families and Qualifiers */ - public static final Text INDEX = new Text("index"); - public static final Text DOC = new Text("event"); - public static final Text NAMESPACE = new Text("ns"); - - /* Time constants */ - public static final String START_BINDING = "binding.start"; - public static final String END_BINDING = "binding.end"; - public static final String TIME_PREDICATE = "binding.timePredicate"; - public static final String SHARDRANGE_BINDING = "binding.shardRange"; - public static final String SHARDRANGE_START = "binding.shardRange.start"; - public static final String SHARDRANGE_END = "binding.shardRange.end"; - public static final String TIME_TYPE_PROP = "binding.timeProp"; - public static final String AUTHORIZATION_PROP = "binding.authorization"; - public static final String NUMTHREADS_PROP = "binding.numthreads"; - public static final String ALLSHARDS_PROP = "binding.allshards"; - - public static final String VALUE_DELIMITER = "\03"; - - public static final SimpleDateFormat XMLDATE = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); - - public enum TimeType { - TIMESTAMP, XMLDATETIME - } - - public static boolean isTimeRange(ShardSubjectLookup lookup, Configuration configuration) { - return (configuration.get(TIME_PREDICATE) != null) || (lookup.getTimePredicate() != null); - } - - public static Long validateFillStartTime(Long start, ShardSubjectLookup lookup) { - if (lookup.getShardStartTimeRange() != null) - return Long.parseLong(lookup.getShardEndTimeRange()); - return (start == null) ? 0l : start; - } - - public static Long validateFillEndTime(Long end, ShardSubjectLookup lookup) { - if (lookup.getShardEndTimeRange() != null) - return Long.parseLong(lookup.getShardEndTimeRange()); - return (end == null) ? System.currentTimeMillis() : end; - } - - public static String getStartTimeRange(ShardSubjectLookup lookup, Configuration configuration) { - String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate()); - String st = configProperty(configuration, START_BINDING, lookup.getStartTimeRange()); - TimeType tt = lookup.getTimeType(); - if (tt == null) - tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP)); - return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(st), tt); - } - - public static String getEndTimeRange(ShardSubjectLookup lookup, Configuration configuration) { - String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate()); - String et = configProperty(configuration, END_BINDING, lookup.getEndTimeRange()); - TimeType tt = lookup.getTimeType(); - if (tt == null) - tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP)); - return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(et), tt); - } - - public static String convertTime(Long timestamp, TimeType timeType) { - return (TimeType.XMLDATETIME.equals(timeType)) - ? (DATATYPE_LITERAL_MARKER_STR + XMLDATE.format(new Date(timestamp))) - : PLAIN_LITERAL_MARKER_STR + timestamp; - } - - public static String configProperty(Configuration configuration, String property, String checkValue) { - if (checkValue == null) - return configuration.get(property); - return checkValue; - } -}
