http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java new file mode 100644 index 0000000..c03b124 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java @@ -0,0 +1,318 @@ +package mvm.rya.cloudbase.mr.fileinput; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.ZooKeeperInstance; +import cloudbase.core.client.admin.TableOperations; +import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; +import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.util.TextUtil; +import com.google.common.base.Preconditions; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolver; +import mvm.rya.cloudbase.CloudbaseRdfConstants; +import mvm.rya.cloudbase.mr.utils.MRUtils; +import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Statement; +import org.openrdf.rio.*; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.cloudbase.CloudbaseRdfUtils.extractValue; +import static mvm.rya.cloudbase.CloudbaseRdfUtils.from; + +/** + * Take large ntrips files and use MapReduce and Cloudbase + * Bulk ingest techniques to load into the table in our partition format. + * <p/> + * Input: NTrips file + * Map: + * - key : shard row - Text + * - value : stmt in doc triple format - Text + * Partitioner: RangePartitioner + * Reduce: + * - key : all the entries for each triple - Cloudbase Key + * Class BulkNtripsInputTool + * Date: Sep 13, 2011 + * Time: 10:00:17 AM + */ +public class BulkNtripsInputTool extends Configured implements Tool { + + public static final String WORKDIR_PROP = "bulk.n3.workdir"; + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.129:2181"; + private String ttl = null; + private String workDirBase = "/temp/bulkcb/work"; + private String format = RDFFormat.NTRIPLES.getName(); + + @Override + public int run(final String[] args) throws Exception { + final Configuration conf = getConf(); + try { + //conf + zk = conf.get(MRUtils.CB_ZK_PROP, zk); + ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); + instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); + userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); + pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); + workDirBase = conf.get(WORKDIR_PROP, workDirBase); + format = conf.get(MRUtils.FORMAT_PROP, format); + conf.set(MRUtils.FORMAT_PROP, format); + final String inputDir = args[0]; + + ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk); + Connector connector = zooKeeperInstance.getConnector(userName, pwd); + TableOperations tableOperations = connector.tableOperations(); + + String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + if (tablePrefix != null) + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, + tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, + tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX}; + Collection<Job> jobs = new ArrayList<Job>(); + for (final String tableName : tables) { + PrintStream out = null; + try { + String workDir = workDirBase + "/" + tableName; + System.out.println("Loading data into table[" + tableName + "]"); + + Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]"); + job.setJarByClass(this.getClass()); + //setting long job + Configuration jobConf = job.getConfiguration(); + jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); + jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256")); + jobConf.setBoolean("mapred.compress.map.output", true); +// jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression + + job.setInputFormatClass(TextInputFormat.class); + + job.setMapperClass(ParseNtripsMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + + job.setCombinerClass(OutStmtMutationsReducer.class); + job.setReducerClass(OutStmtMutationsReducer.class); + job.setOutputFormatClass(CloudbaseFileOutputFormat.class); + CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk); + + jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName); + + TextInputFormat.setInputPaths(job, new Path(inputDir)); + + FileSystem fs = FileSystem.get(conf); + Path workPath = new Path(workDir); + if (fs.exists(workPath)) + fs.delete(workPath, true); + + CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); + + out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); + + if (!tableOperations.exists(tableName)) + tableOperations.create(tableName); + Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE); + for (Text split : splits) + out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); + + job.setNumReduceTasks(splits.size() + 1); + out.close(); + + job.setPartitionerClass(KeyRangePartitioner.class); + RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); + + jobConf.set(WORKDIR_PROP, workDir); + + job.submit(); + jobs.add(job); + + } catch (Exception re) { + throw new RuntimeException(re); + } finally { + if (out != null) + out.close(); + } + } + + for (Job job : jobs) { + while (!job.isComplete()) { + Thread.sleep(1000); + } + } + + for (String tableName : tables) { + String workDir = workDirBase + "/" + tableName; + tableOperations.importDirectory( + tableName, + workDir + "/files", + workDir + "/failures", + 20, + 4, + false); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + + return 0; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * input: ntrips format triple + * <p/> + * output: key: shard row from generator + * value: stmt in serialized format (document format) + */ + public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { + public static final String TABLE_PROPERTY = "parsentripsmapper.table"; + + private RDFParser parser; + private String rdfFormat; + private String namedGraph; + private RyaContext ryaContext = RyaContext.getInstance(); + private TripleRowResolver rowResolver = ryaContext.getTripleResolver(); + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + final String table = conf.get(TABLE_PROPERTY); + Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job"); + + final String cv_s = conf.get(MRUtils.CB_CV_PROP); + final byte[] cv = cv_s == null ? null : cv_s.getBytes(); + rdfFormat = conf.get(MRUtils.FORMAT_PROP); + checkNotNull(rdfFormat, "Rdf format cannot be null"); + + namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP); + + parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); + parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY)); + parser.setRDFHandler(new RDFHandler() { + + @Override + public void startRDF() throws RDFHandlerException { + + } + + @Override + public void endRDF() throws RDFHandlerException { + + } + + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + + } + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + try { + RyaStatement rs = RdfToRyaConversions.convertStatement(statement); + if(rs.getColumnVisibility() == null) { + rs.setColumnVisibility(cv); + } + + // Inject the specified context into the statement. + if(namedGraph != null){ + rs.setContext(new RyaURI(namedGraph)); + } + + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs); + + if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) { + TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + context.write( + from(tripleRow), + extractValue(tripleRow) + ); + } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) { + TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + context.write( + from(tripleRow), + extractValue(tripleRow) + ); + } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) { + TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + context.write( + from(tripleRow), + extractValue(tripleRow) + ); + } else + throw new IllegalArgumentException("Unrecognized table[" + table + "]"); + + } catch (Exception e) { + throw new RDFHandlerException(e); + } + } + + @Override + public void handleComment(String s) throws RDFHandlerException { + + } + }); + } + + @Override + public void map(LongWritable key, Text value, Context output) + throws IOException, InterruptedException { + String rdf = value.toString(); + try { + parser.parse(new StringReader(rdf), ""); + } catch (RDFParseException e) { + System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]"); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException("Exception occurred parsing triple[" + rdf + "]"); + } + } + } + + public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { + + public void reduce(Key key, Iterable<Value> values, Context output) + throws IOException, InterruptedException { + output.write(key, CloudbaseRdfConstants.EMPTY_VALUE); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java new file mode 100644 index 0000000..5aed4a2 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java @@ -0,0 +1,230 @@ +package mvm.rya.cloudbase.mr.fileinput; + +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Mutation; +import cloudbase.core.security.ColumnVisibility; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.cloudbase.CloudbaseRdfConstants; +import mvm.rya.cloudbase.RyaTableMutationsFactory; +import mvm.rya.cloudbase.mr.utils.MRUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.Statement; +import org.openrdf.rio.*; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Collection; +import java.util.Date; +import java.util.Map; + +/** + * Do bulk import of rdf files + * Class RdfFileInputTool2 + * Date: May 16, 2011 + * Time: 3:12:16 PM + */ +public class RdfFileInputByLineTool implements Tool { + + private Configuration conf = new Configuration(); + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.113:2181"; + private String tablePrefix = null; + private RDFFormat format = RDFFormat.NTRIPLES; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + conf.setLong("mapred.task.timeout", 600000000); + + zk = conf.get(MRUtils.CB_ZK_PROP, zk); + instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); + userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); + pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); + format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.getName())); + + String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + + Job job = new Job(conf); + job.setJarByClass(RdfFileInputByLineTool.class); + + // set up cloudbase input + job.setInputFormatClass(TextInputFormat.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); +// job.setOutputKeyClass(LongWritable.class); +// job.setOutputValueClass(StatementWritable.class); + + job.setOutputFormatClass(CloudbaseOutputFormat.class); + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); + + // set mapper and reducer classes + job.setMapperClass(TextToMutationMapper.class); + job.setNumReduceTasks(0); +// job.setReducerClass(Reducer.class); + + // set output +// Path outputDir = new Path("/temp/sparql-out/testout"); +// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); +// if (dfs.exists(outputDir)) +// dfs.deleteMutation(outputDir, true); +// +// FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + @Override + public int run(String[] args) throws Exception { + return (int) runJob(args); + } + + public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> { + protected RDFParser parser; + private String prefix; + private RDFFormat rdfFormat; + protected Text spo_table; + private Text po_table; + private Text osp_table; + private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression(); + + public TextToMutationMapper() { + } + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + if (prefix != null) { + RdfCloudTripleStoreConstants.prefixTables(prefix); + } + + spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + + final String cv_s = conf.get(MRUtils.CB_CV_PROP); + if (cv_s != null) + cv = cv_s.getBytes(); + + rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString())); + parser = Rio.createParser(rdfFormat); + final RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); + + parser.setRDFHandler(new RDFHandler() { + + @Override + public void startRDF() throws RDFHandlerException { + + } + + @Override + public void endRDF() throws RDFHandlerException { + + } + + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + + } + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + try { + RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); + if(ryaStatement.getColumnVisibility() == null) { + ryaStatement.setColumnVisibility(cv); + } + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = + mut.serialize(ryaStatement); + Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + + for (Mutation m : spo) { + context.write(spo_table, m); + } + for (Mutation m : po) { + context.write(po_table, m); + } + for (Mutation m : osp) { + context.write(osp_table, m); + } + } catch (Exception e) { + throw new RDFHandlerException(e); + } + } + + @Override + public void handleComment(String s) throws RDFHandlerException { + + } + }); + } + + @Override + protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { + try { + parser.parse(new StringReader(value.toString()), ""); + } catch (Exception e) { + throw new IOException(e); + } + } + + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java new file mode 100644 index 0000000..54f9a13 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java @@ -0,0 +1,115 @@ +package mvm.rya.cloudbase.mr.fileinput; + +import mvm.rya.api.domain.utils.RyaStatementWritable; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.cloudbase.mr.utils.MRUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.openrdf.model.Statement; +import org.openrdf.rio.*; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Be able to input multiple rdf formatted files. Convert from rdf format to statements. + * Class RdfFileInputFormat + * Date: May 16, 2011 + * Time: 2:11:24 PM + */ +public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> { + + @Override + public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new RdfFileRecordReader(); + } + + private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler { + + boolean closed = false; + long count = 0; + BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>(); + int total = 0; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) inputSplit; + Configuration conf = taskAttemptContext.getConfiguration(); + String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); //default to RDF/XML + RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s); + + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream fileIn = fs.open(fileSplit.getPath()); + + RDFParser rdfParser = Rio.createParser(rdfFormat); + rdfParser.setRDFHandler(this); + try { + rdfParser.parse(fileIn, ""); + } catch (Exception e) { + throw new IOException(e); + } + fileIn.close(); + total = queue.size(); + //TODO: Make this threaded so that you don't hold too many statements before sending them + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return queue.size() > 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return new LongWritable(count++); + } + + @Override + public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException { + return queue.poll(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return ((float) (total - queue.size())) / ((float) total); + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public void startRDF() throws RDFHandlerException { + } + + @Override + public void endRDF() throws RDFHandlerException { + } + + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + } + + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement))); + } + + @Override + public void handleComment(String s) throws RDFHandlerException { + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java new file mode 100644 index 0000000..f48cbae --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java @@ -0,0 +1,185 @@ +package mvm.rya.cloudbase.mr.fileinput; + +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Mutation; +import cloudbase.core.security.ColumnVisibility; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.utils.RyaStatementWritable; +import mvm.rya.cloudbase.CloudbaseRdfConstants; +import mvm.rya.cloudbase.RyaTableMutationsFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.rio.RDFFormat; + +import java.io.IOException; +import java.util.Collection; +import java.util.Date; +import java.util.Map; + +import static mvm.rya.cloudbase.mr.utils.MRUtils.*; + +/** + * Do bulk import of rdf files + * Class RdfFileInputTool + * Date: May 16, 2011 + * Time: 3:12:16 PM + */ +public class RdfFileInputTool implements Tool { + + private Configuration conf; + + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.113:2181"; + private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + private String format = RDFFormat.RDFXML.getName(); + + + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new RdfFileInputTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { + //faster + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + + zk = conf.get(CB_ZK_PROP, zk); + instance = conf.get(CB_INSTANCE_PROP, instance); + userName = conf.get(CB_USERNAME_PROP, userName); + pwd = conf.get(CB_PWD_PROP, pwd); + + tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, tablePrefix); + format = conf.get(FORMAT_PROP, format); + conf.set(FORMAT_PROP, format); + + Job job = new Job(conf); + job.setJarByClass(RdfFileInputTool.class); + + // set up cloudbase input + job.setInputFormatClass(RdfFileInputFormat.class); + RdfFileInputFormat.addInputPath(job, new Path(args[0])); + + // set input output of the particular job + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(RyaStatementWritable.class); +// job.setOutputKeyClass(LongWritable.class); +// job.setOutputValueClass(StatementWritable.class); + + job.setOutputFormatClass(CloudbaseOutputFormat.class); + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); + + // set mapper and reducer classes + job.setMapperClass(StatementToMutationMapper.class); + job.setNumReduceTasks(0); +// job.setReducerClass(Reducer.class); + + // set output +// Path outputDir = new Path("/temp/sparql-out/testout"); +// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); +// if (dfs.exists(outputDir)) +// dfs.deleteMutation(outputDir, true); +// +// FileOutputFormat.setOutputPath(job, outputDir); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return job + .getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS").getValue(); + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + @Override + public int run(String[] args) throws Exception { + runJob(args); + return 0; + } + + public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> { + protected String tablePrefix; + protected Text spo_table; + protected Text po_table; + protected Text osp_table; + private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression(); + RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); + + public StatementToMutationMapper() { + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + + final String cv_s = conf.get(CB_CV_PROP); + if (cv_s != null) + cv = cv_s.getBytes(); + } + + @Override + protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { + RyaStatement statement = value.getRyaStatement(); + if (statement.getColumnVisibility() == null) { + statement.setColumnVisibility(cv); + } + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = + mut.serialize(statement); + Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + + for (Mutation m : spo) { + context.write(spo_table, m); + } + for (Mutation m : po) { + context.write(po_table, m); + } + for (Mutation m : osp) { + context.write(osp_table, m); + } + } + + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java new file mode 100644 index 0000000..5d7d971 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java @@ -0,0 +1,314 @@ +//package mvm.rya.cloudbase.mr.fileinput; +// +//import cloudbase.core.client.Connector; +//import cloudbase.core.client.ZooKeeperInstance; +//import cloudbase.core.client.admin.TableOperations; +//import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat; +//import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; +//import cloudbase.core.data.Key; +//import cloudbase.core.data.Value; +//import cloudbase.core.util.TextUtil; +//import com.google.common.base.Preconditions; +//import mvm.rya.api.RdfCloudTripleStoreConstants; +//import mvm.rya.cloudbase.CloudbaseRdfConstants; +//import mvm.rya.cloudbase.RyaTableKeyValues; +//import mvm.rya.cloudbase.mr.utils.MRUtils; +//import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner; +//import mvm.rya.cloudbase.utils.shard.HashAlgorithm; +//import mvm.rya.cloudbase.utils.shard.HashCodeHashAlgorithm; +//import org.apache.commons.codec.binary.Base64; +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.conf.Configured; +//import org.apache.hadoop.fs.FileSystem; +//import org.apache.hadoop.fs.Path; +//import org.apache.hadoop.io.LongWritable; +//import org.apache.hadoop.io.Text; +//import org.apache.hadoop.mapreduce.Job; +//import org.apache.hadoop.mapreduce.Mapper; +//import org.apache.hadoop.mapreduce.Reducer; +//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +//import org.apache.hadoop.util.Tool; +//import org.apache.hadoop.util.ToolRunner; +//import org.openrdf.model.Resource; +//import org.openrdf.model.Statement; +//import org.openrdf.rio.*; +// +//import java.io.BufferedOutputStream; +//import java.io.IOException; +//import java.io.PrintStream; +//import java.io.StringReader; +//import java.util.ArrayList; +//import java.util.Collection; +//import java.util.Map; +// +//import static com.google.common.base.Preconditions.checkNotNull; +// +///** +//* Take large ntrips files and use MapReduce and Cloudbase +//* Bulk ingest techniques to load into the table in our partition format. +//* Uses a sharded scheme +//* <p/> +//* Input: NTrips file +//* Map: +//* - key : shard row - Text +//* - value : stmt in doc triple format - Text +//* Partitioner: RangePartitioner +//* Reduce: +//* - key : all the entries for each triple - Cloudbase Key +//* Class BulkNtripsInputTool +//* Date: Sep 13, 2011 +//* Time: 10:00:17 AM +//*/ +//public class ShardedBulkNtripsInputTool extends Configured implements Tool { +// +// public static final String WORKDIR_PROP = "bulk.n3.workdir"; +// public static final String BULK_N3_NUMSHARD = "bulk.n3.numshard"; +// +// private String userName = "root"; +// private String pwd = "password"; +// private String instance = "stratus"; +// private String zk = "10.40.190.129:2181"; +// private String ttl = null; +// private String workDirBase = "/temp/bulkcb/work"; +// private String format = RDFFormat.NTRIPLES.getName(); +// private int numShards; +// +// @Override +// public int run(final String[] args) throws Exception { +// final Configuration conf = getConf(); +// try { +// //conf +// zk = conf.get(MRUtils.CB_ZK_PROP, zk); +// ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); +// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); +// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); +// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); +// workDirBase = conf.get(WORKDIR_PROP, workDirBase); +// format = conf.get(MRUtils.FORMAT_PROP, format); +// String numShards_s = conf.get(BULK_N3_NUMSHARD); +// Preconditions.checkArgument(numShards_s != null); +// numShards = Integer.parseInt(numShards_s); +// conf.set(MRUtils.FORMAT_PROP, format); +// final String inputDir = args[0]; +// +// ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk); +// Connector connector = zooKeeperInstance.getConnector(userName, pwd); +// TableOperations tableOperations = connector.tableOperations(); +// +// String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); +// if (tablePrefix != null) +// RdfCloudTripleStoreConstants.prefixTables(tablePrefix); +// String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, +// tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, +// tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX}; +// Collection<Job> jobs = new ArrayList<Job>(); +// for (final String table : tables) { +// for (int i = 0; i < numShards; i++) { +// final String tableName = table + i; +// PrintStream out = null; +// try { +// String workDir = workDirBase + "/" + tableName; +// System.out.println("Loading data into table[" + tableName + "]"); +// +// Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]"); +// job.setJarByClass(this.getClass()); +// //setting long job +// job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); +// job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); +// job.getConfiguration().set("io.sort.mb", "256"); +// job.getConfiguration().setBoolean("mapred.compress.map.output", true); +// job.getConfiguration().set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression +// +// job.setInputFormatClass(TextInputFormat.class); +// +// job.setMapperClass(ShardedParseNtripsMapper.class); +// job.setMapOutputKeyClass(Key.class); +// job.setMapOutputValueClass(Value.class); +// +// job.setCombinerClass(OutStmtMutationsReducer.class); +// job.setReducerClass(OutStmtMutationsReducer.class); +// job.setOutputFormatClass(CloudbaseFileOutputFormat.class); +// CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk); +// +// job.getConfiguration().set(ShardedParseNtripsMapper.TABLE_PROPERTY, tableName); +// job.getConfiguration().set(ShardedParseNtripsMapper.SHARD_PROPERTY, i + ""); +// +// TextInputFormat.setInputPaths(job, new Path(inputDir)); +// +// FileSystem fs = FileSystem.get(conf); +// Path workPath = new Path(workDir); +// if (fs.exists(workPath)) +// fs.deleteMutation(workPath, true); +// +// CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); +// +// out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); +// +// if (!tableOperations.exists(tableName)) +// tableOperations.create(tableName); +// Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE); +// for (Text split : splits) +// out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); +// +// job.setNumReduceTasks(splits.size() + 1); +// out.close(); +// +// job.setPartitionerClass(KeyRangePartitioner.class); +// RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); +// +// job.getConfiguration().set(WORKDIR_PROP, workDir); +// +// job.submit(); +// jobs.add(job); +// +// } catch (Exception re) { +// throw new RuntimeException(re); +// } finally { +// if (out != null) +// out.close(); +// } +// } +// } +// +// for (Job job : jobs) { +// while (!job.isComplete()) { +// Thread.sleep(1000); +// } +// } +// +// for (String table : tables) { +// for (int i = 0; i < numShards; i++) { +// final String tableName = table + i; +// String workDir = workDirBase + "/" + tableName; +// tableOperations.importDirectory( +// tableName, +// workDir + "/files", +// workDir + "/failures", +// 20, +// 4, +// false); +// } +// } +// +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// +// return 0; +// } +// +// public static void main(String[] args) { +// try { +// ToolRunner.run(new Configuration(), new ShardedBulkNtripsInputTool(), args); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// /** +// * input: ntrips format triple +// * <p/> +// * output: key: shard row from generator +// * value: stmt in serialized format (document format) +// */ +// public static class ShardedParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { +// public static final String TABLE_PROPERTY = "shardedparsentripsmapper.table"; +// public static final String SHARD_PROPERTY = "shardedparsentripsmapper.shard"; +// +// private RDFParser parser; +// private String rdfFormat; +// private HashAlgorithm hashAlgorithm = new HashCodeHashAlgorithm(); +// private int shard; +// private int numShards; +// +// @Override +// protected void setup(final Context context) throws IOException, InterruptedException { +// super.setup(context); +// Configuration conf = context.getConfiguration(); +// final String table = conf.get(TABLE_PROPERTY); +// Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job"); +// +// String shard_s = conf.get(SHARD_PROPERTY); +// Preconditions.checkNotNull(shard_s, "Set the " + SHARD_PROPERTY + " property"); +// shard = Integer.parseInt(shard_s); +// +// numShards = Integer.parseInt(conf.get(BULK_N3_NUMSHARD)); +// +// final String cv_s = conf.get(MRUtils.CB_CV_PROP); +// rdfFormat = conf.get(MRUtils.FORMAT_PROP); +// checkNotNull(rdfFormat, "Rdf format cannot be null"); +// +// parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); +// parser.setRDFHandler(new RDFHandler() { +// +// @Override +// public void startRDF() throws RDFHandlerException { +// +// } +// +// @Override +// public void endRDF() throws RDFHandlerException { +// +// } +// +// @Override +// public void handleNamespace(String s, String s1) throws RDFHandlerException { +// +// } +// +// @Override +// public void handleStatement(Statement statement) throws RDFHandlerException { +// try { +// Resource subject = statement.getSubject(); +// if ((hashAlgorithm.hash(subject.stringValue()) % numShards) != shard) { +// return; +// } +// RyaTableKeyValues rdfTableKeyValues = new RyaTableKeyValues(subject, statement.getPredicate(), statement.getObject(), cv_s, statement.getContext()).invoke(); +// Collection<Map.Entry<Key, Value>> entries = null; +// if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) { +// entries = rdfTableKeyValues.getSpo(); +// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) { +// entries = rdfTableKeyValues.getPo(); +// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) { +// entries = rdfTableKeyValues.getOsp(); +// } else +// throw new IllegalArgumentException("Unrecognized table[" + table + "]"); +// +// for (Map.Entry<Key, Value> entry : entries) { +// context.write(entry.getKey(), entry.getValue()); +// } +// } catch (Exception e) { +// throw new RDFHandlerException(e); +// } +// } +// +// @Override +// public void handleComment(String s) throws RDFHandlerException { +// +// } +// }); +// } +// +// @Override +// public void map(LongWritable key, Text value, Context output) +// throws IOException, InterruptedException { +// String rdf = value.toString(); +// try { +// parser.parse(new StringReader(rdf), ""); +// } catch (RDFParseException e) { +// System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]"); +// } catch (Exception e) { +// e.printStackTrace(); +// throw new IOException("Exception occurred parsing triple[" + rdf + "]"); +// } +// } +// } +// +// public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { +// +// public void reduce(Key key, Iterable<Value> values, Context output) +// throws IOException, InterruptedException { +// output.write(key, CloudbaseRdfConstants.EMPTY_VALUE); +// } +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java new file mode 100644 index 0000000..453d6ca --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java @@ -0,0 +1,350 @@ +//package mvm.rya.cloudbase.mr.upgrade; +// +//import cloudbase.core.client.Connector; +//import cloudbase.core.client.ZooKeeperInstance; +//import cloudbase.core.client.admin.TableOperations; +//import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +//import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +//import cloudbase.core.data.Key; +//import cloudbase.core.data.Mutation; +//import cloudbase.core.data.Range; +//import cloudbase.core.data.Value; +//import cloudbase.core.security.Authorizations; +//import cloudbase.core.security.ColumnVisibility; +//import cloudbase.core.util.Pair; +//import com.google.common.collect.Lists; +//import com.google.common.io.ByteArrayDataInput; +//import com.google.common.io.ByteArrayDataOutput; +//import com.google.common.io.ByteStreams; +//import mvm.rya.api.InvalidValueTypeMarkerRuntimeException; +//import mvm.rya.api.RdfCloudTripleStoreConstants; +//import mvm.rya.cloudbase.CloudbaseRdfConfiguration; +//import mvm.rya.cloudbase.CloudbaseRdfConstants; +//import mvm.rya.cloudbase.CloudbaseRyaDAO; +//import mvm.rya.cloudbase.RyaTableMutationsFactory; +//import mvm.rya.cloudbase.mr.utils.MRUtils; +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.conf.Configured; +//import org.apache.hadoop.io.Text; +//import org.apache.hadoop.mapreduce.Job; +//import org.apache.hadoop.mapreduce.Mapper; +//import org.apache.hadoop.util.Tool; +//import org.apache.hadoop.util.ToolRunner; +//import org.openrdf.model.*; +//import org.openrdf.model.impl.StatementImpl; +//import org.openrdf.model.impl.ValueFactoryImpl; +// +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Collection; +//import java.util.Date; +//import java.util.Map; +// +//import static mvm.rya.api.RdfCloudTripleStoreUtils.*; +// +///** +// * 1. Check version. <br/> +// * 2. If version does not exist, apply: <br/> +// * - DELIM => 1 -> 0 +// * - DELIM_STOP => 2 -> 1 +// * - 3 table index +// */ +//public class UpgradeCloudbaseRdfTables extends Configured implements Tool { +// public static final String TMP = "_tmp"; +// public static final String DELETE_PROP = "rdf.upgrade.deleteMutation"; //true if ok to deleteMutation old tables +// private String zk = "10.40.190.113:2181"; +// private String instance = "stratus"; +// private String userName = "root"; +// private String pwd = "password"; +// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; +// private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); +// +// @Override +// public int run(String[] strings) throws Exception { +// conf = new CloudbaseRdfConfiguration(getConf()); +// //faster +// conf.setBoolean("mapred.map.tasks.speculative.execution", false); +// conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); +// conf.set(MRUtils.JOB_NAME_PROP, "Upgrading Cloudbase Rdf Tables"); +// +// zk = conf.get(MRUtils.CB_ZK_PROP, zk); +// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); +// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); +// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); +// +// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); +// +// Authorizations authorizations = CloudbaseRdfConstants.ALL_AUTHORIZATIONS; +// String auth = conf.get(MRUtils.CB_AUTH_PROP); +// if (auth != null) +// authorizations = new Authorizations(auth.split(",")); +// +// boolean deleteTables = conf.getBoolean(DELETE_PROP, false); +// +// //tables +// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; +// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; +// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; +// String so = tablePrefix + "so"; +// String ops = tablePrefix + "o"; +// +// //check version first +// Connector connector = new ZooKeeperInstance(instance, zk).getConnector(userName, pwd.getBytes()); +// CloudbaseRyaDAO rdfDAO = new CloudbaseRyaDAO(); +// rdfDAO.setConnector(connector); +// conf.setTablePrefix(tablePrefix); +// rdfDAO.setConf(conf); +//// rdfDAO.setSpoTable(spo); +//// rdfDAO.setPoTable(po); +//// rdfDAO.setOspTable(osp); +//// rdfDAO.setNamespaceTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); +// rdfDAO.init(); +// String version = rdfDAO.getVersion(); +// if (version != null) { +// //TODO: Do a version check here +// //version found, no need to upgrade +// return 0; +// } +// +// rdfDAO.destroy(); +// +// //create osp table, deleteMutation so and o tables +// TableOperations tableOperations = connector.tableOperations(); +// if (deleteTables) { +// if (tableOperations.exists(so)) { +// tableOperations.deleteMutation(so); +// } +// if (tableOperations.exists(ops)) { +// tableOperations.deleteMutation(ops); +// } +// } +// +// conf.set("io.sort.mb", "256"); +// Job job = new Job(conf); +// job.setJarByClass(UpgradeCloudbaseRdfTables.class); +// +// //set up cloudbase input +// job.setInputFormatClass(CloudbaseInputFormat.class); +// CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), spo, authorizations); +// CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); +// Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>(); +// final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT); +// columns.add(pair); +// CloudbaseInputFormat.fetchColumns(job, columns); +// +// CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); +// +// // set input output of the particular job +// job.setMapOutputKeyClass(Text.class); +// job.setMapOutputValueClass(Mutation.class); +// +// //no reducer needed? +// job.setNumReduceTasks(0); +// job.setMapperClass(UpgradeCloudbaseRdfTablesMapper.class); +// +// CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, spo + TMP); +// CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); +// job.setOutputFormatClass(CloudbaseOutputFormat.class); +// +// // Submit the job +// Date startTime = new Date(); +// System.out.println("Job started: " + startTime); +// int exitCode = job.waitForCompletion(true) ? 0 : 1; +// +// if (exitCode == 0) { +// Date end_time = new Date(); +// System.out.println("Job ended: " + end_time); +// System.out.println("The job took " +// + (end_time.getTime() - startTime.getTime()) / 1000 +// + " seconds."); +// +// //now deleteMutation old spo table, and rename tmp one +// if (deleteTables) { +// tableOperations.deleteMutation(spo); +// tableOperations.rename(spo + TMP, spo); +// tableOperations.deleteMutation(po); +// tableOperations.rename(po + TMP, po); +// tableOperations.deleteMutation(osp); +// tableOperations.rename(osp + TMP, osp); +// } +// +// return 0; +// } else { +// System.out.println("Job Failed!!!"); +// } +// +// return -1; +// } +// +// public static void main(String[] args) { +// try { +// ToolRunner.run(new Configuration(), new UpgradeCloudbaseRdfTables(), args); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// public static class UpgradeCloudbaseRdfTablesMapper extends Mapper<Key, Value, Text, Mutation> { +// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; +// ValueFactoryImpl vf = new ValueFactoryImpl(); +// +// private Text spo_table, po_table, osp_table; +// +// RyaTableMutationsFactory mut = new RyaTableMutationsFactory(); +// +// @Override +// protected void setup(Context context) throws IOException, InterruptedException { +// super.setup(context); +// Configuration conf = context.getConfiguration(); +// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); +// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX + TMP; +// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX + TMP; +// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX + TMP; +// +// spo_table = new Text(spo); +// po_table = new Text(po); +// osp_table = new Text(osp); +// } +// +// @Override +// protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { +// //read in old format +// Statement statement = null; +// try { +// statement = translateOldStatementFromRow(ByteStreams.newDataInput(key.getRow().getBytes()), "spo", vf); +// } catch (Exception e) { +// //not the right version +// return; +// } +// +// //translate to new format and save in new tables +// Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Mutation> mutationMap = mut.serialize(statement.getSubject(), statement.getPredicate(), statement.getObject(), new ColumnVisibility(key.getColumnVisibility()), statement.getContext()); +// Mutation spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); +// Mutation po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); +// Mutation osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); +// +// context.write(spo_table, spo); +// context.write(po_table, po); +// context.write(osp_table, osp); +// +// //TODO: Contexts +// } +// } +// +// public static org.openrdf.model.Value readOldValue(ByteArrayDataInput dataIn, ValueFactory vf) +// throws IOException, ClassCastException { +// int valueTypeMarker; +// try { +// valueTypeMarker = dataIn.readByte(); +// } catch (Exception e) { +// return null; +// } +// +// org.openrdf.model.Value ret = null; +// if (valueTypeMarker == RdfCloudTripleStoreConstants.URI_MARKER) { +// String uriString = readString(dataIn); +// ret = vf.createURI(uriString); +// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.BNODE_MARKER) { +// String bnodeID = readString(dataIn); +// ret = vf.createBNode(bnodeID); +// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER) { +// String label = readString(dataIn); +// ret = vf.createLiteral(label); +// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER) { +// String label = readString(dataIn); +// String language = readString(dataIn); +// ret = vf.createLiteral(label, language); +// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER) { +// String label = readString(dataIn); +// URI datatype = (URI) readOldValue(dataIn, vf); +// ret = vf.createLiteral(label, datatype); +// } else { +// throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type marker: " +// + valueTypeMarker); +// } +// +// return ret; +// } +// +// public static Statement translateOldStatementFromRow(ByteArrayDataInput input, String table, ValueFactory vf) throws IOException { +// Resource subject; +// URI predicate; +// org.openrdf.model.Value object; +// if ("spo".equals(table)) { +// subject = (Resource) readOldValue(input, vf); +// input.readByte(); +// predicate = (URI) readOldValue(input, vf); +// input.readByte(); +// object = readOldValue(input, vf); +// } else if ("o".equals(table)) { +// object = readOldValue(input, vf); +// input.readByte(); +// predicate = (URI) readOldValue(input, vf); +// input.readByte(); +// subject = (Resource) readOldValue(input, vf); +// } else if ("po".equals(table)) { +// predicate = (URI) readOldValue(input, vf); +// input.readByte(); +// object = readOldValue(input, vf); +// input.readByte(); +// subject = (Resource) readOldValue(input, vf); +// } else { +// //so +// subject = (Resource) readOldValue(input, vf); +// input.readByte(); +// object = readOldValue(input, vf); +// input.readByte(); +// predicate = (URI) readOldValue(input, vf); +// } +// return new StatementImpl(subject, predicate, object); +// } +// +// public static byte[] writeOldValue(org.openrdf.model.Value value) throws IOException { +// if (value == null) +// return new byte[]{}; +// ByteArrayDataOutput dataOut = ByteStreams.newDataOutput(); +// if (value instanceof URI) { +// dataOut.writeByte(RdfCloudTripleStoreConstants.URI_MARKER); +// writeString(((URI) value).toString(), dataOut); +// } else if (value instanceof BNode) { +// dataOut.writeByte(RdfCloudTripleStoreConstants.BNODE_MARKER); +// writeString(((BNode) value).getID(), dataOut); +// } else if (value instanceof Literal) { +// Literal lit = (Literal) value; +// +// String label = lit.getLabel(); +// String language = lit.getLanguage(); +// URI datatype = lit.getDatatype(); +// +// if (datatype != null) { +// dataOut.writeByte(RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER); +// writeString(label, dataOut); +// dataOut.write(writeOldValue(datatype)); +// } else if (language != null) { +// dataOut.writeByte(RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER); +// writeString(label, dataOut); +// writeString(language, dataOut); +// } else { +// dataOut.writeByte(RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER); +// writeString(label, dataOut); +// } +// } else { +// throw new IllegalArgumentException("unexpected value type: " +// + value.getClass()); +// } +// return dataOut.toByteArray(); +// } +// +// private static String OLD_DELIM = "\u0001"; +// private static byte[] OLD_DELIM_BYTES = OLD_DELIM.getBytes(); +// +// public static byte[] buildOldRowWith(byte[] bytes_one, byte[] bytes_two, byte[] bytes_three) throws IOException { +// ByteArrayDataOutput rowidout = ByteStreams.newDataOutput(); +// rowidout.write(bytes_one); +// rowidout.write(OLD_DELIM_BYTES); +// rowidout.write(bytes_two); +// rowidout.write(OLD_DELIM_BYTES); +// rowidout.write(bytes_three); +// return truncateRowId(rowidout.toByteArray()); +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java new file mode 100644 index 0000000..950f585 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java @@ -0,0 +1,94 @@ +package mvm.rya.cloudbase.mr.utils; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +/** + * Class MRSailUtils + * Date: May 19, 2011 + * Time: 10:34:06 AM + */ +public class MRUtils { + + public static final String JOB_NAME_PROP = "mapred.job.name"; + + public static final String CB_USERNAME_PROP = "cb.username"; + public static final String CB_PWD_PROP = "cb.pwd"; + public static final String CB_ZK_PROP = "cb.zk"; + public static final String CB_INSTANCE_PROP = "cb.instance"; + public static final String CB_TTL_PROP = "cb.ttl"; + public static final String CB_CV_PROP = "cb.cv"; + public static final String CB_AUTH_PROP = "cb.auth"; + public static final String CB_MOCK_PROP = "cb.mock"; + public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout"; + public static final String FORMAT_PROP = "rdf.format"; + + public static final String NAMED_GRAPH_PROP = "rdf.graph"; + + public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix"; + + // rdf constants + public static final ValueFactory vf = new ValueFactoryImpl(); + public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"); + + + // cloudbase map reduce utils + +// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException { +// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); +// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key)); +// if (entry_val != null) { +// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES); +// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val)); +// } +// byte[] startrow = startRowOut.toByteArray(); +// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES); +// byte[] stoprow = startRowOut.toByteArray(); +// +// Range range = new Range(new Text(startrow), new Text(stoprow)); +// return range; +// } + + + public static String getCBTtl(Configuration conf) { + return conf.get(CB_TTL_PROP); + } + + public static String getCBUserName(Configuration conf) { + return conf.get(CB_USERNAME_PROP); + } + + public static String getCBPwd(Configuration conf) { + return conf.get(CB_PWD_PROP); + } + + public static String getCBZK(Configuration conf) { + return conf.get(CB_ZK_PROP); + } + + public static String getCBInstance(Configuration conf) { + return conf.get(CB_INSTANCE_PROP); + } + + public static void setCBUserName(Configuration conf, String str) { + conf.set(CB_USERNAME_PROP, str); + } + + public static void setCBPwd(Configuration conf, String str) { + conf.set(CB_PWD_PROP, str); + } + + public static void setCBZK(Configuration conf, String str) { + conf.set(CB_ZK_PROP, str); + } + + public static void setCBInstance(Configuration conf, String str) { + conf.set(CB_INSTANCE_PROP, str); + } + + public static void setCBTtl(Configuration conf, String str) { + conf.set(CB_TTL_PROP, str); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java new file mode 100644 index 0000000..d3f8ae7 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java @@ -0,0 +1,34 @@ +package mvm.rya.cloudbase.query; + +import cloudbase.core.client.BatchScanner; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.base.Preconditions; +import mango.collect.AbstractCloseableIterable; +import mvm.rya.cloudbase.BatchScannerIterator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + */ +public class BatchScannerCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> { + + private BatchScanner scanner; + + public BatchScannerCloseableIterable(BatchScanner scanner) { + Preconditions.checkNotNull(scanner); + this.scanner = scanner; + } + + @Override + protected void doClose() throws IOException { + scanner.close(); + } + + @Override + protected Iterator<Map.Entry<Key, Value>> retrieveIterator() { + return new BatchScannerIterator(scanner.iterator()); + } +}
