http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java new file mode 100644 index 0000000..4b369ae --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java @@ -0,0 +1,155 @@ +package mvm.mmrts.rdf.partition.mr; + +import com.google.common.io.ByteStreams; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Date; + +/** + * Class SparqlTestDriver + * Date: Oct 28, 2010 + * Time: 2:53:39 PM + */ +public class SparqlTestDriver implements Tool { + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new SparqlTestDriver(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private Configuration conf; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public int run(String[] args) throws IOException, InterruptedException, + ClassNotFoundException { + + //query from file + if(args.length < 2) { + throw new IllegalArgumentException("Usage: hadoop jar mvm.mmrts.rdf.partition.mr.SparqlTestDriver <local query file> outputFile"); + } + + FileInputStream fis = new FileInputStream(args[0]); + String query = new String(ByteStreams.toByteArray(fis)); + fis.close(); + + Job job = new Job(conf); + job.setJarByClass(SparqlTestDriver.class); + + // set up cloudbase input + job.setInputFormatClass(SparqlPartitionStoreInputFormat.class); + SparqlPartitionStoreInputFormat.setInputInfo(job, "root", "password".getBytes()); + SparqlPartitionStoreInputFormat.setZooKeeperInstance(job, "stratus", "10.40.190.113:2181"); + SparqlPartitionStoreInputFormat.setLongJob(job, null); + SparqlPartitionStoreInputFormat.setTable(job, "partitionRdf"); + + long startTime_l = 1303811164088l; + long ttl = 86400000; + + //set query +// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + +// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + +// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + +// "SELECT * WHERE\n" + +// "{\n" + +// "?id tdp:reportedAt ?timestamp. \n" + +// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314380456900 , 1314384056900 , 'XMLDATETIME')).\n" + +// "?id tdp:performedBy ?system.\n" + +// "} \n"; +// +// String query2 = "PREFIX hb: <http://here/2010/tracked-data-provenance/heartbeat/ns#>\n" + +// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + +// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + +// "SELECT * WHERE\n" + +// "{\n" + +// "?id hb:timeStamp ?timestamp. \n" + +// "FILTER(mvmpart:timeRange(?id, hb:timeStamp, 1314360009522 , 1314367209522 , 'TIMESTAMP')).\n" + +// "?id hb:count ?count.\n" + +// "?id hb:systemName ?systemName.\n" + +// "} "; + + System.out.println(query); + System.out.println(); +// System.out.println(query2); + + SparqlPartitionStoreInputFormat.setSparqlQueries(job, query); +// SparqlCloudbaseStoreInputFormat.setStartTime(job, 1309956861000l + ""); +// SparqlCloudbaseStoreInputFormat.setTtl(job, 86400000 + ""); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + //job.setOutputFormatClass(FileOutputFormat.class); + + + // set mapper and reducer classes + job.setMapperClass(MyTempMapper.class); + job.setReducerClass(Reducer.class); + job.setNumReduceTasks(1); + + // set output + Path outputDir = new Path(args[1]); + FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); + if (dfs.exists(outputDir)) + dfs.delete(outputDir, true); + + FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return (int) job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class MyTempMapper extends Mapper<LongWritable, MapWritable, Text, Text> { + Text outKey = new Text(); + Text outValue = new Text("partition"); + @Override + protected void map(LongWritable key, MapWritable value, Context context) throws IOException, InterruptedException { + outKey.set(value.values().toString()); + context.write(outKey, outValue); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java new file mode 100644 index 0000000..80255ba --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java @@ -0,0 +1,154 @@ +package mvm.mmrts.rdf.partition.mr; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Bytes; +import mvm.mmrts.rdf.partition.PartitionConstants; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Statement; + +import java.io.IOException; +import java.util.Date; + +/** + * Class SparqlTestDriver + * Date: Oct 28, 2010 + * Time: 2:53:39 PM + */ +public class TestDriver implements Tool { + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new TestDriver(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private Configuration conf; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public int run(String[] args) throws IOException, InterruptedException, + ClassNotFoundException { + + Job job = new Job(conf); + job.setJarByClass(TestDriver.class); + + FileInputFormat.addInputPaths(job, "/temp/rpunnoose/results.txt"); + job.setInputFormatClass(TextInputFormat.class); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(MapWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setOutputFormatClass(TextOutputFormat.class); + + // set mapper and reducer classes + job.setMapperClass(SubjectMapWrMapper.class); + job.setReducerClass(OutMapWrReducer.class); + job.setNumReduceTasks(1); +// job.setNumReduceTasks(0); + + // set output + Path outputDir = new Path("/temp/rpunnoose/partBS"); + FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); + if (dfs.exists(outputDir)) + dfs.delete(outputDir, true); + + FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return (int) job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class SubjectMapWrMapper extends Mapper<LongWritable, Text, Text, MapWritable> { + Text outKey = new Text(); + final String ID = "id"; + final Text ID_TXT = new Text(ID); + final String PERF_AT = "performedBy"; + final Text PERF_AT_TXT = new Text("system"); + final String REPORT_AT = "reportedAt"; + final Text REPORT_AT_TXT = new Text("timestamp"); + final String TYPE = "type"; + final Text TYPE_TXT = new Text(TYPE); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String s = value.toString(); + int i = s.lastIndexOf("\0"); + Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(s.substring(0, i).getBytes()), PartitionConstants.VALUE_FACTORY); + String predStr = stmt.getPredicate().stringValue(); + if (!predStr.contains(PERF_AT) && !predStr.contains(REPORT_AT) && !predStr.contains(TYPE)) + return; + + outKey.set(stmt.getSubject().stringValue()); + MapWritable mw = new MapWritable(); + mw.put(ID_TXT, outKey); + if (predStr.contains(PERF_AT)) + mw.put(PERF_AT_TXT, new Text(stmt.getObject().stringValue())); + else if (predStr.contains(REPORT_AT)) + mw.put(REPORT_AT_TXT, new Text(stmt.getObject().stringValue())); + else if (predStr.contains(TYPE)) + mw.put(TYPE_TXT, new Text(stmt.getObject().stringValue())); + + context.write(outKey, mw); + } + } + + public static class OutMapWrReducer extends Reducer<Text, MapWritable, Text, Text> { + final Text PART = new Text("partitionBS"); + Text outKey = new Text(); + + @Override + protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { + MapWritable mw = new MapWritable(); + for (MapWritable value : values) { + mw.putAll(value); + } + outKey.set(mw.values().toString()); + context.write(outKey, PART); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java new file mode 100644 index 0000000..2b4565f --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java @@ -0,0 +1,229 @@ +package mvm.mmrts.rdf.partition.mr.compat; + +import cloudbase.core.CBConstants; +import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Key; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.security.ColumnVisibility; +import mvm.mmrts.rdf.partition.PartitionConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table + * Class MoveShardIndexTool + * Date: Dec 8, 2011 + * Time: 4:11:40 PM + */ +public class ChangeShardDateFormatTool implements Tool { + public static final String CB_USERNAME_PROP = "cb.username"; + public static final String CB_PWD_PROP = "cb.pwd"; + public static final String CB_ZK_PROP = "cb.zk"; + public static final String CB_INSTANCE_PROP = "cb.instance"; + public static final String PARTITION_TABLE_PROP = "partition.table"; + public static final String OLD_DATE_FORMAT_PROP = "date.format.old"; + public static final String NEW_DATE_FORMAT_PROP = "date.format.new"; + public static final String OLD_DATE_SHARD_DELIM = "date.shard.delim.old"; + public static final String NEW_DATE_SHARD_DELIM = "date.shard.delim.new"; + + + private Configuration conf; + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.113:2181"; + private String partitionTable = "rdfPartition"; + private String oldDateFormat = "yyyy-MM"; + private String newDateFormat = "yyyyMMdd"; + private String oldDateDelim = "-"; + private String newDateDelim = "_"; + + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new ChangeShardDateFormatTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public int run(String[] args) throws Exception { + runJob(args); + return 0; + } + + public long runJob(String[] args) throws Exception { + //faster + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + + zk = conf.get(CB_ZK_PROP, zk); + instance = conf.get(CB_INSTANCE_PROP, instance); + userName = conf.get(CB_USERNAME_PROP, userName); + pwd = conf.get(CB_PWD_PROP, pwd); + partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable); + oldDateFormat = conf.get(OLD_DATE_FORMAT_PROP, oldDateFormat); + newDateFormat = conf.get(NEW_DATE_FORMAT_PROP, newDateFormat); + oldDateDelim = conf.get(OLD_DATE_SHARD_DELIM, oldDateDelim); + newDateDelim = conf.get(NEW_DATE_SHARD_DELIM, newDateDelim); + conf.set(NEW_DATE_FORMAT_PROP, newDateFormat); + conf.set(OLD_DATE_FORMAT_PROP, oldDateFormat); + conf.set(PARTITION_TABLE_PROP, partitionTable); + conf.set(OLD_DATE_SHARD_DELIM, oldDateDelim); + conf.set(NEW_DATE_SHARD_DELIM, newDateDelim); + + Job job = new Job(conf); + job.setJarByClass(ChangeShardDateFormatTool.class); + + job.setInputFormatClass(CloudbaseInputFormat.class); + //TODO: How should I send in Auths? + CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), + partitionTable, CBConstants.NO_AUTHS); + CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); + + job.setMapperClass(ChangeDateFormatMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + job.setOutputFormatClass(CloudbaseOutputFormat.class); + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, partitionTable); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); + + job.setNumReduceTasks(0); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class ChangeDateFormatMapper extends Mapper<Key, Value, Text, Mutation> { + private SimpleDateFormat oldDateFormat_df; + private SimpleDateFormat newDateFormat_df; + private Text partTableTxt; + private String newDateDelim; + private String oldDateDelim; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + String oldDateFormat = context.getConfiguration().get(OLD_DATE_FORMAT_PROP); + if (oldDateFormat == null) + throw new IllegalArgumentException("Old Date Format property cannot be null"); + + oldDateFormat_df = new SimpleDateFormat(oldDateFormat); + + String newDateFormat = context.getConfiguration().get(NEW_DATE_FORMAT_PROP); + if (newDateFormat == null) + throw new IllegalArgumentException("New Date Format property cannot be null"); + + newDateFormat_df = new SimpleDateFormat(newDateFormat); + + String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP); + if (partTable == null) + throw new IllegalArgumentException("Partition Table property cannot be null"); + + partTableTxt = new Text(partTable); + + oldDateDelim = context.getConfiguration().get(OLD_DATE_SHARD_DELIM); + if (oldDateDelim == null) + throw new IllegalArgumentException("Old Date Shard Delimiter property cannot be null"); + + newDateDelim = context.getConfiguration().get(NEW_DATE_SHARD_DELIM); + if (newDateDelim == null) + throw new IllegalArgumentException("New Date Shard Delimiter property cannot be null"); + + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + try { + String cf = key.getColumnFamily().toString(); + if ("event".equals(cf) || "index".equals(cf)) { + String shard = key.getRow().toString(); + int shardIndex = shard.lastIndexOf(oldDateDelim); + if (shardIndex == -1) + return; //no shard? + String date_s = shard.substring(0, shardIndex); + String shardValue = shard.substring(shardIndex + 1, shard.length()); + + Date date = oldDateFormat_df.parse(date_s); + String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue; + + Mutation mutation = new Mutation(new Text(newShard)); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); + context.write(partTableTxt, mutation); + + //delete + mutation = new Mutation(key.getRow()); + mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); + + context.write(partTableTxt, mutation); + } else { + //shard index + String shard = key.getColumnFamily().toString(); + int shardIndex = shard.lastIndexOf(oldDateDelim); + if (shardIndex == -1) + return; //no shard? + + String date_s = shard.substring(0, shardIndex); + String shardValue = shard.substring(shardIndex + 1, shard.length()); + + Date date = oldDateFormat_df.parse(date_s); + String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue; + + Mutation mutation = new Mutation(key.getRow()); + mutation.put(new Text(newShard), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); + + //delete + mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); + context.write(partTableTxt, mutation); + } + } catch (ParseException pe) { + //only do work for the rows that match the old date format + //throw new IOException(pe); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java new file mode 100644 index 0000000..ba2eece --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java @@ -0,0 +1,171 @@ +package mvm.mmrts.rdf.partition.mr.compat; + +import cloudbase.core.CBConstants; +import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Key; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.security.ColumnVisibility; +import mvm.mmrts.rdf.partition.PartitionConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; + +/** + * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table + * Class MoveShardIndexTool + * Date: Dec 8, 2011 + * Time: 4:11:40 PM + */ +public class MoveShardIndexTool implements Tool { + public static final String CB_USERNAME_PROP = "cb.username"; + public static final String CB_PWD_PROP = "cb.pwd"; + public static final String CB_ZK_PROP = "cb.zk"; + public static final String CB_INSTANCE_PROP = "cb.instance"; + public static final String PARTITION_TABLE_PROP = "partition.table"; + public static final String SHARD_INDEX_TABLE_PROP = "shard.index.table"; + public static final String SHARD_INDEX_DELETE_PROP = "shard.index.delete"; + + + private Configuration conf; + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.113:2181"; + private String partitionTable = "rdfPartition"; + private String shardIndexTable = "rdfShardIndex"; + + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new MoveShardIndexTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public int run(String[] args) throws Exception { + runJob(args); + return 0; + } + + public long runJob(String[] args) throws Exception { + //faster + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + + zk = conf.get(CB_ZK_PROP, zk); + instance = conf.get(CB_INSTANCE_PROP, instance); + userName = conf.get(CB_USERNAME_PROP, userName); + pwd = conf.get(CB_PWD_PROP, pwd); + partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable); + shardIndexTable = conf.get(SHARD_INDEX_TABLE_PROP, shardIndexTable); + conf.set(SHARD_INDEX_TABLE_PROP, shardIndexTable); + conf.set(PARTITION_TABLE_PROP, partitionTable); + + Job job = new Job(conf); + job.setJarByClass(MoveShardIndexTool.class); + + job.setInputFormatClass(CloudbaseInputFormat.class); + //TODO: How should I send in Auths? + CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), + partitionTable, CBConstants.NO_AUTHS); + CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); + CloudbaseInputFormat.setRanges(job, Collections.singleton( + new Range( + new Text(PartitionConstants.URI_MARKER_STR), + new Text(PartitionConstants.PLAIN_LITERAL_MARKER_STR)))); + + job.setMapperClass(ShardKeyValueToMutationMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + job.setOutputFormatClass(CloudbaseOutputFormat.class); + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, shardIndexTable); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); + + job.setNumReduceTasks(0); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class ShardKeyValueToMutationMapper extends Mapper<Key, Value, Text, Mutation> { + private Text shardTableTxt; + private Text partTableTxt; + protected boolean deletePrevShardIndex; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + String shardTable = context.getConfiguration().get(SHARD_INDEX_TABLE_PROP); + if (shardTable == null) + throw new IllegalArgumentException("Shard Table property cannot be null"); + + shardTableTxt = new Text(shardTable); + + String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP); + if (partTable == null) + throw new IllegalArgumentException("Partition Table property cannot be null"); + + partTableTxt = new Text(partTable); + + deletePrevShardIndex = context.getConfiguration().getBoolean(SHARD_INDEX_DELETE_PROP, false); + System.out.println("Deleting shard index from previous: " + deletePrevShardIndex + " Part: " + partTableTxt); + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + Mutation mutation = new Mutation(key.getRow()); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); + + context.write(shardTableTxt, mutation); + + if (deletePrevShardIndex) { + mutation = new Mutation(key.getRow()); + mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); + + context.write(partTableTxt, mutation); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java new file mode 100644 index 0000000..b347a56 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java @@ -0,0 +1,155 @@ +package mvm.mmrts.rdf.partition.mr.fileinput; + +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.openrdf.model.Statement; +import org.openrdf.rio.*; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Be able to input multiple rdf formatted files. Convert from rdf format to statements. + * Class RdfFileInputFormat + * Date: May 16, 2011 + * Time: 2:11:24 PM + */ +public class RdfFileInputFormat extends FileInputFormat<LongWritable, BytesWritable> { + + public static final String RDF_FILE_FORMAT = "mvm.mmrts.rdf.cloudbase.sail.mr.fileinput.rdfformat"; + + @Override + public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new RdfFileRecordReader(); + } + + private class RdfFileRecordReader extends RecordReader<LongWritable, BytesWritable> implements RDFHandler { + + boolean closed = false; + long count = 0; + BlockingQueue<BytesWritable> queue = new LinkedBlockingQueue<BytesWritable>(); + int total = 0; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) inputSplit; + Configuration conf = taskAttemptContext.getConfiguration(); + String rdfForm_s = conf.get(RDF_FILE_FORMAT, RDFFormat.RDFXML.getName()); + RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s); + + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream fileIn = fs.open(fileSplit.getPath()); + + RDFParser rdfParser = Rio.createParser(rdfFormat); + rdfParser.setRDFHandler(this); + try { + rdfParser.parse(fileIn, ""); + } catch (Exception e) { + throw new IOException(e); + } + fileIn.close(); + total = queue.size(); + //TODO: Make this threaded so that you don't hold too many statements before sending them + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return queue.size() > 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return new LongWritable(count++); + } + + @Override + public BytesWritable getCurrentValue() throws IOException, InterruptedException { + return queue.poll(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return ((float) (total - queue.size())) / ((float) total); + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public void startRDF() throws RDFHandlerException { + } + + @Override + public void endRDF() throws RDFHandlerException { + } + + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + } + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + try { + byte[] stmt_bytes = RdfIO.writeStatement(statement, true); + queue.add(new BytesWritable(stmt_bytes)); + } catch (IOException e) { + throw new RDFHandlerException(e); + } + } + + @Override + public void handleComment(String s) throws RDFHandlerException { + } + } +// +// public static RDFParser createRdfParser(RDFFormat rdfFormat) { +// if (RDFFormat.RDFXML.equals(rdfFormat)) { +// return new RDFXMLParserFactory().getParser(); +// } else if (RDFFormat.N3.equals(rdfFormat)) { +// return new N3ParserFactory().getParser(); +// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) { +// return new NTriplesParserFactory().getParser(); +// } else if (RDFFormat.TRIG.equals(rdfFormat)) { +// return new TriGParserFactory().getParser(); +// } else if (RDFFormat.TRIX.equals(rdfFormat)) { +// return new TriXParserFactory().getParser(); +// } else if (RDFFormat.TURTLE.equals(rdfFormat)) { +// return new TurtleParserFactory().getParser(); +// } +// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]"); +// } +// +// public static RDFWriter createRdfWriter(RDFFormat rdfFormat, OutputStream os) { +// if (RDFFormat.RDFXML.equals(rdfFormat)) { +// return new RDFXMLWriterFactory().getWriter(os); +// } else if (RDFFormat.N3.equals(rdfFormat)) { +// return new N3WriterFactory().getWriter(os); +// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) { +// return new NTriplesWriterFactory().getWriter(os); +// } else if (RDFFormat.TRIG.equals(rdfFormat)) { +// return new TriGWriterFactory().getWriter(os); +// } else if (RDFFormat.TRIX.equals(rdfFormat)) { +// return new TriXWriterFactory().getWriter(os); +// } else if (RDFFormat.TURTLE.equals(rdfFormat)) { +// return new TurtleWriterFactory().getWriter(os); +// } +// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]"); +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java new file mode 100644 index 0000000..12c1a4e --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java @@ -0,0 +1,210 @@ +package mvm.mmrts.rdf.partition.mr.fileinput; + +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Mutation; +import com.google.common.io.ByteStreams; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.ValueFactoryImpl; + +import java.io.IOException; +import java.util.Date; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.PartitionConstants.EMPTY_VALUE; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +/** + * Do bulk import of rdf files + * Class RdfFileInputToCloudbaseTool + * Date: May 16, 2011 + * Time: 3:12:16 PM + */ +public class RdfFileInputToCloudbaseTool implements Tool { + + public static final String CB_USERNAME_PROP = "cb.username"; + public static final String CB_PWD_PROP = "cb.pwd"; + public static final String CB_SERVER_PROP = "cb.server"; + public static final String CB_PORT_PROP = "cb.port"; + public static final String CB_INSTANCE_PROP = "cb.instance"; + public static final String CB_TTL_PROP = "cb.ttl"; + public static final String CB_TABLE_PROP = "cb.table"; + + + private Configuration conf; + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String server = "10.40.190.113"; + private String port = "2181"; + private String table = "partitionRdf"; + + + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new RdfFileInputToCloudbaseTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + //faster + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + + server = conf.get(CB_SERVER_PROP, server); + port = conf.get(CB_PORT_PROP, port); + instance = conf.get(CB_INSTANCE_PROP, instance); + userName = conf.get(CB_USERNAME_PROP, userName); + pwd = conf.get(CB_PWD_PROP, pwd); + table = conf.get(CB_TABLE_PROP, table); + conf.set(CB_TABLE_PROP, table); + + Job job = new Job(conf); + job.setJarByClass(RdfFileInputToCloudbaseTool.class); + + // set up cloudbase input + job.setInputFormatClass(RdfFileInputFormat.class); + RdfFileInputFormat.addInputPath(job, new Path(args[0])); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(BytesWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + job.setOutputFormatClass(CloudbaseOutputFormat.class); + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, table); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, server + ":" + port); + + // set mapper and reducer classes + job.setMapperClass(OutSubjStmtMapper.class); + job.setReducerClass(StatementToMutationReducer.class); + + // set output +// Path outputDir = new Path("/temp/sparql-out/testout"); +// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); +// if (dfs.exists(outputDir)) +// dfs.delete(outputDir, true); +// +// FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + @Override + public int run(String[] args) throws Exception { + runJob(args); + return 0; + } + + public static class OutSubjStmtMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> { + + public OutSubjStmtMapper() { + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { + Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance()); + context.write(new Text(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR), value); + } + + } + + public static class StatementToMutationReducer extends Reducer<Text, BytesWritable, Text, Mutation> { + private Text outputTable; + private DateHashModShardValueGenerator gen; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + outputTable = new Text(context.getConfiguration().get(CB_TABLE_PROP, null)); + gen = new DateHashModShardValueGenerator(); + } + + @Override + protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { + Resource subject = (Resource) RdfIO.readValue(ByteStreams.newDataInput(key.getBytes()), ValueFactoryImpl.getInstance(), FAMILY_DELIM); + byte[] subj_bytes = writeValue(subject); + String shard = gen.generateShardValue(subject); + Text shard_txt = new Text(shard); + + /** + * Triple - > + *- < subject ><shard >: + *- < shard > event:<subject >\0 < predicate >\0 < object >\0 + *- < shard > index:<predicate >\1 < object >\0 + */ + Mutation m_subj = new Mutation(shard_txt); + for (BytesWritable stmt_bytes : values) { + Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(stmt_bytes.getBytes()), ValueFactoryImpl.getInstance()); + m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE); + m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE); + } + + /** + * TODO: Is this right? + * If the subject does not have any authorizations specified, then anyone can access it. + * But the true authorization check will happen at the predicate/object level, which means that + * the set returned will only be what the person is authorized to see. The shard lookup table has to + * have the lowest level authorization all the predicate/object authorizations; otherwise, + * a user may not be able to see the correct document. + */ + Mutation m_shard = new Mutation(new Text(subj_bytes)); + m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE); + + context.write(outputTable, m_subj); + context.write(outputTable, m_shard); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java new file mode 100644 index 0000000..e677d12 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java @@ -0,0 +1,159 @@ +package mvm.mmrts.rdf.partition.mr.fileinput; + +import com.google.common.io.ByteStreams; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.ValueFactoryImpl; + +import java.io.IOException; +import java.util.Date; + +import static mvm.mmrts.rdf.partition.PartitionConstants.FAMILY_DELIM_STR; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +/** + * Do bulk import of rdf files + * Class RdfFileInputToCloudbaseTool + * Date: May 16, 2011 + * Time: 3:12:16 PM + */ +public class RdfFileInputToFileTool implements Tool { + + private Configuration conf; + + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new RdfFileInputToFileTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + if (args.length < 2) + throw new IllegalArgumentException("Usage: RdfFileInputToFileTool <input directory> <output directory>"); + + //faster + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + + Job job = new Job(conf); + job.setJarByClass(RdfFileInputToFileTool.class); + + // set up cloudbase input + job.setInputFormatClass(RdfFileInputFormat.class); + RdfFileInputFormat.addInputPath(job, new Path(args[0])); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + + + // set mapper and reducer classes + job.setMapperClass(StmtToBytesMapper.class); + job.setReducerClass(StmtBytesReducer.class); + + // set output + job.setOutputFormatClass(TextOutputFormat.class); + Path outputDir = new Path(args[1]); + FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); + if (dfs.exists(outputDir)) + dfs.delete(outputDir, true); + + FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + @Override + public int run(String[] args) throws Exception { + runJob(args); + return 0; + } + + public static class StmtToBytesMapper extends Mapper<LongWritable, BytesWritable, Text, Text> { + + Text outKey = new Text(); + Text outValue = new Text(); + + public StmtToBytesMapper() { + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { + Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance()); + outKey.set(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR); + outValue.set(value.getBytes()); + context.write(outKey, outValue); + } + + } + + public static class StmtBytesReducer extends Reducer<Text, Text, NullWritable, Text> { + + NullWritable outKey = NullWritable.get(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + for (Text stmt_txt : values) { + context.write(outKey, stmt_txt); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java new file mode 100644 index 0000000..fea882d --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java @@ -0,0 +1,326 @@ +package mvm.mmrts.rdf.partition.mr.fileinput.bulk; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.Instance; +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; +import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.util.TextUtil; +import com.google.common.base.Preconditions; +import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; +import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Statement; +import org.openrdf.rio.RDFHandler; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.ntriples.NTriplesParserFactory; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.StringReader; +import java.util.Collection; + +import static mvm.mmrts.rdf.partition.PartitionConstants.*; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; +import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; + +/** + * Take large ntrips files and use MapReduce and Cloudbase + * Bulk ingest techniques to load into the table in our partition format. + * <p/> + * Input: NTrips file + * Map: + * - key : shard row - Text + * - value : stmt in doc triple format - Text + * Partitioner: RangePartitioner + * Reduce: + * - key : all the entries for each triple - Cloudbase Key + * Class BulkNtripsInputTool + * Date: Sep 13, 2011 + * Time: 10:00:17 AM + */ +public class BulkNtripsInputTool extends Configured implements Tool { + + private static DateHashModShardValueGenerator generator = new DateHashModShardValueGenerator(); + public static final String BASE_MOD = "baseMod"; + + @Override + public int run(String[] args) throws Exception { + Preconditions.checkArgument(args.length >= 7, "Usage: hadoop jar jarfile BulkNtripsInputTool <cb instance>" + + " <zookeepers> <username> <password> <output table> <hdfs ntrips dir> <work dir> (<shard size>)"); + + Configuration conf = getConf(); + PrintStream out = null; + try { + Job job = new Job(conf, "Bulk Ingest NTrips to Partition RDF"); + job.setJarByClass(this.getClass()); + + //setting long job + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); + job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); + job.getConfiguration().set("io.sort.mb", "256"); + + job.setInputFormatClass(TextInputFormat.class); + + job.setMapperClass(ParseNtripsMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + + job.setCombinerClass(OutStmtMutationsReducer.class); + job.setReducerClass(OutStmtMutationsReducer.class); + job.setOutputFormatClass(CloudbaseFileOutputFormat.class); + CloudbaseFileOutputFormat.setZooKeeperInstance(job, args[0], args[1]); + + Instance instance = new ZooKeeperInstance(args[0], args[1]); + String user = args[2]; + byte[] pass = args[3].getBytes(); + String tableName = args[4]; + String inputDir = args[5]; + String workDir = args[6]; + if(args.length > 7) { + int baseMod = Integer.parseInt(args[7]); + generator.setBaseMod(baseMod); + job.getConfiguration().setInt(BASE_MOD, baseMod); + } + + Connector connector = instance.getConnector(user, pass); + + TextInputFormat.setInputPaths(job, new Path(inputDir)); + + FileSystem fs = FileSystem.get(conf); + Path workPath = new Path(workDir); + if (fs.exists(workPath)) + fs.delete(workPath, true); + + CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); + + out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); + + Collection<Text> splits = connector.tableOperations().getSplits(tableName, Integer.MAX_VALUE); + for (Text split : splits) + out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); + + job.setNumReduceTasks(splits.size() + 1); + out.close(); + + job.setPartitionerClass(KeyRangePartitioner.class); + RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); + + job.waitForCompletion(true); + + connector.tableOperations().importDirectory( + tableName, + workDir + "/files", + workDir + "/failures", + 20, + 4, + false); + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (out != null) + out.close(); + } + + return 0; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * input: ntrips format triple + * <p/> + * output: key: shard row from generator + * value: stmt in serialized format (document format) + */ + public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { + private static final NTriplesParserFactory N_TRIPLES_PARSER_FACTORY = new NTriplesParserFactory(); + + private Text outputKey = new Text(); + private Text outputValue = new Text(); + private RDFParser parser; + private static byte[] EMPTY_BYTE_ARRAY = new byte[0]; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + generator.setBaseMod(conf.getInt(BASE_MOD, generator.getBaseMod())); + parser = N_TRIPLES_PARSER_FACTORY.getParser(); + parser.setRDFHandler(new RDFHandler() { + + @Override + public void startRDF() throws RDFHandlerException { + + } + + @Override + public void endRDF() throws RDFHandlerException { + + } + + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + + } + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + try { +// byte[] doc_serialized = writeStatement(statement, true); + Text shard = new Text(generator.generateShardValue(statement.getSubject())); + + context.write(new Key(shard, DOC, new Text(writeStatement(statement, true))), EMPTY_VALUE); + context.write(new Key(shard, INDEX, new Text(writeStatement(statement, false))), EMPTY_VALUE); + //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once + context.write(new Key(new Text(writeValue(statement.getSubject())), shard, EMPTY_TXT), EMPTY_VALUE); + +// outputKey.set(key); +// outputValue.set(doc_serialized); +// context.write(outputKey, outputValue); +// outputKey.set(writeValue(statement.getSubject())); +// outputValue.set(EMPTY_BYTE_ARRAY); +// context.write(outputKey, outputValue); + } catch (Exception e) { + throw new RDFHandlerException(e); + } + } + + @Override + public void handleComment(String s) throws RDFHandlerException { + + } + }); + } + + @Override + public void map(LongWritable key, Text value, Context output) + throws IOException, InterruptedException { + try { + parser.parse(new StringReader(value.toString()), ""); + } catch (Exception e) { + throw new IOException("Exception occurred parsing ntrips triple[" + value + "]"); + } + } + } + + public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { + + public void reduce(Key key, Iterable<Value> values, Context output) + throws IOException, InterruptedException { + output.write(key, EMPTY_VALUE); +// System.out.println(key); +// for (Value value : values) { +// System.out.println(value); + /** + * Each of these is a triple. + * 1. format back to statement + * 2. Output the doc,index key,value pairs for each triple + */ +// Statement stmt = readStatement(ByteStreams.newDataInput(value.getBytes()), VALUE_FACTORY); +// output.write(new Key(shardKey, DOC, new Text(writeStatement(stmt, true))), EMPTY_VALUE); +// output.write(new Key(shardKey, INDEX, new Text(writeStatement(stmt, false))), EMPTY_VALUE); +// //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once +// output.write(new Key(new Text(writeValue(stmt.getSubject())), shardKey, EMPTY_TXT), EMPTY_VALUE); +// } + } + } + + public static class EmbedKeyGroupingComparator implements RawComparator<Text> { + + public EmbedKeyGroupingComparator() { + + } + + @Override + public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, + int arg5) { + DataInputBuffer n = new DataInputBuffer(); + + Text temp1 = new Text(); + Text temp2 = new Text(); + + try { + n.reset(arg0, arg1, arg2); + temp1.readFields(n); + n.reset(arg3, arg4, arg5); + temp2.readFields(n); + } catch (IOException e) { + // TODO Auto-generated catch block + //e.printStackTrace(); + throw new RuntimeException(e); + } + + return compare(temp1, temp2); + } + + @Override + public int compare(Text a1, Text a2) { + return EmbedKeyRangePartitioner.retrieveEmbedKey(a1).compareTo(EmbedKeyRangePartitioner.retrieveEmbedKey(a2)); + } + + } + + /** + * Really it does a normal Text compare + */ + public static class EmbedKeySortComparator implements RawComparator<Text> { + + public EmbedKeySortComparator() { + + } + + @Override + public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, + int arg5) { + DataInputBuffer n = new DataInputBuffer(); + + Text temp1 = new Text(); + Text temp2 = new Text(); + + try { + n.reset(arg0, arg1, arg2); + temp1.readFields(n); + n.reset(arg3, arg4, arg5); + temp2.readFields(n); + } catch (IOException e) { + // TODO Auto-generated catch block + //e.printStackTrace(); + throw new RuntimeException(e); + } + + return compare(temp1, temp2); + } + + @Override + public int compare(Text a1, Text a2) { + return a1.compareTo(a2); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java new file mode 100644 index 0000000..f72c382 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java @@ -0,0 +1,28 @@ +package mvm.mmrts.rdf.partition.mr.fileinput.bulk; + +import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; +import mvm.mmrts.rdf.partition.PartitionConstants; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Class EmbedKeyRangePartitioner + * Date: Sep 13, 2011 + * Time: 1:49:35 PM + */ +public class EmbedKeyRangePartitioner extends RangePartitioner { + @Override + public int getPartition(Text key, Writable value, int numPartitions) { + Text embedKey = retrieveEmbedKey(key); + return super.getPartition(embedKey, value, numPartitions); + } + + public static Text retrieveEmbedKey(Text key) { + int split = key.find(PartitionConstants.INDEX_DELIM_STR); + if (split < 0) + return key; + Text newText = new Text(); + newText.append(key.getBytes(), 0, split); + return newText; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java new file mode 100644 index 0000000..a83d594 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java @@ -0,0 +1,45 @@ +package mvm.mmrts.rdf.partition.mr.iterators; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; +import cloudbase.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import ss.cloudbase.core.iterators.SortedRangeIterator; + +import java.io.IOException; +import java.util.Map; + +/** + * Class SortedEncodedRangeIterator + * Date: Sep 8, 2011 + * Time: 6:01:28 PM + */ +public class SortedEncodedRangeIterator extends SortedRangeIterator { + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + if (options.containsKey(OPTION_LOWER_BOUND)) { + lower = new Text(decode(options.get(OPTION_LOWER_BOUND))); + } else { + lower = new Text("\u0000"); + } + + if (options.containsKey(OPTION_UPPER_BOUND)) { + upper = new Text(decode(options.get(OPTION_UPPER_BOUND))); + } else { + upper = new Text("\u0000"); + } + } + + public static String encode(String txt) { + return new String(Base64.encodeBase64(txt.getBytes())); + } + + public static String decode(String txt) { + return new String(Base64.decodeBase64(txt.getBytes())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java new file mode 100644 index 0000000..e360ca7 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java @@ -0,0 +1,31 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; +import java.util.Map; + +/** + * Since each subject is located at most on one tablet, we should be able to assume that + * no reducer is needed. The Combine phase should aggregate properly. + * <p/> + * Class AggregateTriplesBySubjectReducer + * Date: Sep 1, 2011 + * Time: 5:39:24 PM + */ +public class AggregateTriplesBySubjectCombiner extends Reducer<Text, MapWritable, Text, MapWritable> { +// private LongWritable lwout = new LongWritable(); + private MapWritable mwout = new MapWritable(); + + @Override + protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { + for (MapWritable value : values) { + for (Map.Entry<Writable, Writable> entry : value.entrySet()) { + mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()), + WritableUtils.clone(entry.getValue(), context.getConfiguration())); + } + } + context.write(key, mwout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java new file mode 100644 index 0000000..2ea5fa8 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java @@ -0,0 +1,37 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.SELECT_FILTER; + +/** + * Since each subject is located at most on one tablet, we should be able to assume that + * no reducer is needed. The Combine phase should aggregate properly. + * <p/> + * Class AggregateTriplesBySubjectReducer + * Date: Sep 1, 2011 + * Time: 5:39:24 PM + */ +public class AggregateTriplesBySubjectReducer extends Reducer<Text, MapWritable, LongWritable, MapWritable> { + private LongWritable lwout = new LongWritable(); + private MapWritable mwout = new MapWritable(); + + @Override + protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { + for (MapWritable value : values) { + for (Map.Entry<Writable, Writable> entry : value.entrySet()) { + mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()), + WritableUtils.clone(entry.getValue(), context.getConfiguration())); + } + } + lwout.set(key.hashCode()); + context.write(lwout, mwout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java new file mode 100644 index 0000000..0630501 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java @@ -0,0 +1,78 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.ByteStreams; +import mvm.mmrts.rdf.partition.PartitionConstants; +import mvm.mmrts.rdf.partition.utils.RdfIO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.openrdf.model.Statement; + +import java.io.IOException; +import java.util.*; + +import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*; + +/** + * Will take a triple and output: <subject, predObj map> + * <p/> + * Class KeyValueToMapWrMapper + * Date: Sep 1, 2011 + * Time: 4:56:42 PM + */ +public class KeyValueToMapWrMapper extends Mapper<Key, Value, Text, MapWritable> { + +// private List<String> predicateFilter = new ArrayList<String>(); + + private Text subjNameTxt; + private Text keyout = new Text(); + private Text predout = new Text(); + private Text objout = new Text(); + + private Map<String, String> predValueName = new HashMap(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + //find the values to filter on + Configuration conf = context.getConfiguration(); + String[] filter = conf.getStrings(SELECT_FILTER); + if (filter != null) { + for (String predValue : filter) { + String predName = conf.get(predValue); + if (predName != null) + predValueName.put(predValue, predName); + } + } + + String subjName = conf.get(SUBJECT_NAME); + if (subjName != null) { + //not sure it will ever be null + subjNameTxt = new Text(subjName); + } + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), PartitionConstants.VALUE_FACTORY); + String predName = predValueName.get(stmt.getPredicate().stringValue()); + if (predName == null) + return; + + keyout.set(stmt.getSubject().stringValue()); + predout.set(predName); + objout.set(stmt.getObject().stringValue()); + MapWritable mw = new MapWritable(); + mw.put(predout, objout); + if (subjNameTxt != null) { + mw.put(subjNameTxt, keyout); + } + context.write(keyout, mw); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java new file mode 100644 index 0000000..56014f9 --- /dev/null +++ b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java @@ -0,0 +1,118 @@ +package mvm.mmrts.rdf.partition.mr.transform; + +import cloudbase.core.util.ArgumentChecker; +import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor; +import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer; +import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.QueryParser; +import org.openrdf.query.parser.sparql.SPARQLParserFactory; + +/** + * Class SparqlCloudbaseIFJob + * Date: Sep 1, 2011 + * Time: 6:04:35 PM + */ +public class SparqlCloudbaseIFJob { + + private String[] queries; + private String table; + + //Cloudbase properties + private String userName; + private String pwd; + private String instance; + private String zk; + // + + private Class classOriginal; //Calling class for this job. + private String outputPath; + + public SparqlCloudbaseIFJob(String table, String userName, String pwd, String instance, String zk, + String outputPath, Class classOriginal, String... queries) { + ArgumentChecker.notNull(queries); + this.queries = queries; + this.table = table; + this.userName = userName; + this.pwd = pwd; + this.instance = instance; + this.zk = zk; + this.outputPath = outputPath; + this.classOriginal = classOriginal; + } + + public String[] run() throws Exception { + int count = 0; + outputPath = outputPath + "/results/"; + String[] resultsOut = new String[queries.length]; + + for (String query : queries) { + QueryParser parser = (new SPARQLParserFactory()).getParser(); + TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr(); + + final Configuration queryConf = new Configuration(); + expr.visit(new FilterTimeIndexVisitor(queryConf)); + + (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null); + + //make sure of only one shardlookup + expr.visit(new QueryModelVisitorBase<RuntimeException>() { + int count = 0; + + @Override + public void meetOther(QueryModelNode node) throws RuntimeException { + super.meetOther(node); + count++; + if (count > 1) + throw new IllegalArgumentException("Query can only have one subject-star lookup"); + } + }); + + final Job job = new Job(queryConf); + job.setJarByClass(classOriginal); + job.setJobName("SparqlCloudbaseIFTransformer. Query: " + ((query.length() > 32) ? (query.substring(0, 32)) : (query))); + + expr.visit(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meetOther(QueryModelNode node) throws RuntimeException { + super.meetOther(node); + + //set up CloudbaseBatchScannerInputFormat here + if (node instanceof ShardSubjectLookup) { + System.out.println("Lookup: " + node); + try { + new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, table, + userName, pwd, instance, zk); + } catch (QueryEvaluationException e) { + e.printStackTrace(); + } + } + } + }); + + + String resultOutPath = outputPath + "/result-" + count; + resultsOut[count] = resultOutPath; + Path outputDir = new Path(resultOutPath); + FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf); + if (dfs.exists(outputDir)) + dfs.delete(outputDir, true); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job, outputDir); + + + // Submit the job + job.waitForCompletion(true); + count++; + } + return resultsOut; + } +}
