http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/assembly/job.xml ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/assembly/job.xml b/partition/mr.partition.rdf/src/main/assembly/job.xml deleted file mode 100644 index 259b917..0000000 --- a/partition/mr.partition.rdf/src/main/assembly/job.xml +++ /dev/null @@ -1,38 +0,0 @@ -<assembly - xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>job</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <unpack>false</unpack> - <scope>runtime</scope> - <outputDirectory>lib</outputDirectory> - <excludes> - <exclude>org.apache.hadoop:hadoop-core</exclude> - <exclude>${artifact.groupId}:${artifact.artifactId}</exclude> - </excludes> - </dependencySet> - <dependencySet> - <unpack>false</unpack> - <scope>system</scope> - <outputDirectory>lib</outputDirectory> - <excludes> - <exclude>${artifact.groupId}:${artifact.artifactId}</exclude> - </excludes> - </dependencySet> - </dependencySets> - <fileSets> - <fileSet> - <directory>${basedir}/target/classes</directory> - <outputDirectory>/</outputDirectory> - <excludes> - <exclude>*.jar</exclude> - </excludes> - </fileSet> - </fileSets> -</assembly> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy b/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy deleted file mode 100644 index e5e02ec..0000000 --- a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy +++ /dev/null @@ -1,33 +0,0 @@ -import org.openrdf.rio.rdfxml.* -import org.openrdf.rio.ntriples.NTriplesWriterFactory -import org.openrdf.rio.RDFHandler - -@Grab(group='com.google.guava', module='guava', version='r06') -@Grab(group='org.openrdf.sesame', module='sesame-rio-rdfxml', version='2.3.2') -@Grab(group='org.openrdf.sesame', module='sesame-rio-ntriples', version='2.3.2') -@Grab(group='org.slf4j', module='slf4j-simple', version='1.5.8') -def convertDirRdfFormat(def dir, def outputFile) { - //read each file - assert dir.isDirectory() - - def ntriplesWriter = NTriplesWriterFactory.newInstance().getWriter(new FileOutputStream(outputFile)) - - ntriplesWriter.startRDF() - dir.listFiles().each { it -> - //load file into rdfxml parser - def rdfxmlParser = RDFXMLParserFactory.newInstance().getParser() - rdfxmlParser.setRDFHandler( - [ startRDF: {}, - endRDF: {}, - handleNamespace: { def prefix, def uri -> ntriplesWriter.handleNamespace(prefix, uri)}, - handleComment: {}, - handleStatement: { def stmt -> ntriplesWriter.handleStatement stmt}] as RDFHandler - ) - rdfxmlParser.parse(new FileInputStream(it), "") - } - ntriplesWriter.endRDF() -} - -try{ -convertDirRdfFormat(new File(args[0]), new File(args[1])) -}catch(Exception e) {e.printStackTrace();} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java deleted file mode 100644 index e8b2e5a..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java +++ /dev/null @@ -1,104 +0,0 @@ -package mvm.mmrts.rdf.partition.mr; - -import com.google.common.io.ByteStreams; -import mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFJob; - -import java.io.FileInputStream; - -/** - * Class MrTstBed - * Date: Sep 1, 2011 - * Time: 9:18:53 AM - */ -public class MrTstBed { - public static void main(String[] args) { - try { -// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + -// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + -// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + -// "SELECT * WHERE\n" + -// "{\n" + -// "?id tdp:reportedAt ?timestamp. \n" + -// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314898074000 , 1314898374000 , 'XMLDATETIME')).\n" + -// "?id tdp:performedBy ?system.\n" + -// "?id <http://here/2010/cmv/ns#hasMarkingText> \"U\".\n" + -// "?id rdf:type tdp:Sent.\n" + -// "} \n"; - - FileInputStream fis = new FileInputStream(args[0]); - String query = new String(ByteStreams.toByteArray(fis)); - fis.close(); - -// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + -// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + -// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + -// "SELECT * WHERE\n" + -// "{\n" + -// "?id tdp:reportedAt ?timestamp.\n" + -// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314381770000 , 1314381880000 , 'XMLDATETIME')).\n" + -// "?id tdp:performedBy ?system.\n" + -// "}"; - - new SparqlCloudbaseIFJob("partitionRdf", "root", "password", "stratus", "stratus13:2181", "/temp/queryout", MrTstBed.class, query).run(); - -// QueryParser parser = (new SPARQLParserFactory()).getParser(); -// TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr(); -// System.out.println(expr); -// -// final Configuration queryConf = new Configuration(); -// expr.visit(new FilterTimeIndexVisitor(queryConf)); -// -// (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null); -// -// System.out.println(expr); -// -// //make sure of only one shardlookup -// expr.visit(new QueryModelVisitorBase<RuntimeException>() { -// int count = 0; -// -// @Override -// public void meetOther(QueryModelNode node) throws RuntimeException { -// super.meetOther(node); -// count++; -// if (count > 1) -// throw new IllegalArgumentException("Query can only have one subject-star lookup"); -// } -// }); -// -// final Job job = new Job(queryConf); -// job.setJarByClass(MrTstBed.class); -// -// expr.visit(new QueryModelVisitorBase<RuntimeException>() { -// @Override -// public void meetOther(QueryModelNode node) throws RuntimeException { -// super.meetOther(node); -// -// //set up CloudbaseBatchScannerInputFormat here -// if (node instanceof ShardSubjectLookup) { -// System.out.println("Lookup: " + node); -// try { -// new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, "partitionRdf", -// "root", "password", "stratus", "stratus13:2181"); -// } catch (QueryEvaluationException e) { -// e.printStackTrace(); -// } -// } -// } -// }); -// -// Path outputDir = new Path("/temp/sparql-out/testout"); -// FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf); -// if (dfs.exists(outputDir)) -// dfs.delete(outputDir, true); -// -// FileOutputFormat.setOutputPath(job, outputDir); -// -// // Submit the job -// Date startTime = new Date(); -// System.out.println("Job started: " + startTime); -// job.waitForCompletion(true); - } catch (Exception e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java deleted file mode 100644 index 15c9c79..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java +++ /dev/null @@ -1,411 +0,0 @@ -package mvm.mmrts.rdf.partition.mr; - -import cloudbase.core.client.ZooKeeperInstance; -import cloudbase.core.util.ArgumentChecker; -import mvm.mmrts.rdf.partition.PartitionSail; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.*; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.*; -import org.openrdf.repository.Repository; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import static mvm.mmrts.rdf.partition.PartitionConstants.*; - -/** - * Class SparqlPartitionStoreInputFormat - * Date: Oct 28, 2010 - * Time: 11:48:17 AM - */ -public class SparqlPartitionStoreInputFormat extends InputFormat<LongWritable, MapWritable> { - - public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.sparqlinputformat"; - public static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; - public static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; - public static final String USERNAME = PREFIX + ".username"; - public static final String PASSWORD = PREFIX + ".password"; - - public static final String INSTANCE_NAME = PREFIX + ".instanceName"; - public static final String ZK = PREFIX + ".zk"; - - public static final String STARTTIME = PREFIX + ".starttime"; - public static final String ENDTIME = PREFIX + ".endtime"; - public static final String TABLE = PREFIX + ".table"; - public static final String SHARD_TABLE = PREFIX + ".shardtable"; - public static final String SPARQL_QUERIES_PROP = PREFIX + ".sparql"; - public static final String MR_NUMTHREADS_PROP = PREFIX + ".numthreads"; -// public static final String RANGE_PROP = PREFIX + ".range"; -// public static final String NUM_RANGES_PROP = PREFIX + ".numranges"; -// public static final String TABLE_PREFIX_PROP = PREFIX + ".tablePrefix"; -// public static final String OFFSET_RANGE_PROP = PREFIX + ".offsetrange"; - -// public static final String INFER_PROP = PREFIX + ".infer"; - - private static final String UTF_8 = "UTF-8"; - - private static final ValueFactory vf = ValueFactoryImpl.getInstance(); - - static class SparqlInputSplit extends InputSplit implements Writable { - - protected String sparql; - protected String startTime; - protected String endTime; - protected String table; -// private Long offset; -// private Long limit; - - private SparqlInputSplit() { - } - - private SparqlInputSplit(String sparql, String startTime, String endTime, String table) { - this.sparql = sparql; - this.startTime = startTime; - this.endTime = endTime; - this.table = table; -// this.offset = offset; -// this.limit = limit; - } - - @Override - public long getLength() throws IOException, InterruptedException { - return 0; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[]{sparql}; - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - boolean startTimeExists = startTime != null; - dataOutput.writeBoolean(startTimeExists); - if (startTimeExists) - dataOutput.writeUTF(startTime); - - boolean endTimeExists = endTime != null; - dataOutput.writeBoolean(endTimeExists); - if (endTimeExists) - dataOutput.writeUTF(endTime); - - dataOutput.writeUTF(table); - dataOutput.writeUTF(sparql); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - if (dataInput.readBoolean()) - this.startTime = dataInput.readUTF(); - if (dataInput.readBoolean()) - this.endTime = dataInput.readUTF(); - this.table = dataInput.readUTF(); - this.sparql = dataInput.readUTF(); - } - } - - /** - * Create a SparqlInputSplit for every sparql query.<br> - * Separate a single sparql query into numRanges of time ranges. For example, - * a numRange of 3, with range of 1 day (ms), and 1 query, will have 3 input splits - * with the same query, however the first range will go from now to a day before, the second - * will go from the day before to the day before that, the third will go from the two days - * ago to forever back. - * <br><br> - * If the numRanges is not set, or set to 1, the inputsplit can only focus on a certain startTime, - * ttl. If these are not set, then look at all time. - * - * @param job - * @return - * @throws java.io.IOException - * @throws InterruptedException - */ - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { - validateOptions(job.getConfiguration()); - final Collection<String> queries = getSparqlQueries(job.getConfiguration()); - if (queries == null || queries.size() == 0) - throw new IOException("Queries cannot be null or empty"); - - String startTime_s = getStartTime(job.getConfiguration()); - String endTime_s = getEndTime(job.getConfiguration()); - - List<InputSplit> splits = new ArrayList<InputSplit>(); - for (String query : queries) { - splits.add(new SparqlInputSplit(query, startTime_s, endTime_s, getTable(job.getConfiguration()))); - } - return splits; - } - - @Override - public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new SparqlResultsRecordReader(taskAttemptContext.getConfiguration()); - } - - protected static String getUsername(Configuration conf) { - return conf.get(USERNAME); - } - - /** - * WARNING: The password is stored in the Configuration and shared with all - * MapReduce tasks; It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - */ - protected static String getPassword(Configuration conf) { - return new String(Base64.decodeBase64(conf.get(PASSWORD, "").getBytes())); - } - - protected static String getInstance(Configuration conf) { - return conf.get(INSTANCE_NAME); - } - - public static void setSparqlQueries(JobContext job, String... queries) { - if (queries == null || queries.length == 0) - throw new IllegalArgumentException("Queries cannot be null or empty"); - - final Configuration conf = job.getConfiguration(); - setSparqlQueries(conf, queries); - } - - public static void setSparqlQueries(Configuration conf, String... queries) { - try { - Collection<String> qencs = new ArrayList<String>(); - for (String query : queries) { - final String qenc = URLEncoder.encode(query, UTF_8); - qencs.add(qenc); - } - conf.setStrings(SPARQL_QUERIES_PROP, qencs.toArray(new String[qencs.size()])); - } catch (UnsupportedEncodingException e) { - //what to do... - e.printStackTrace(); - } - } - - public static Collection<String> getSparqlQueries(Configuration conf) { - Collection<String> queries = new ArrayList<String>(); - final Collection<String> qencs = conf.getStringCollection(SPARQL_QUERIES_PROP); - for (String qenc : qencs) { - queries.add(qenc); - } - return queries; - } - - public static void setLongJob(JobContext job, Long time) { - Configuration conf = job.getConfiguration(); - //need to make the runtime longer, default 30 min - time = (time == null) ? 1800000 : time; - conf.setLong("mapreduce.tasktracker.healthchecker.script.timeout", time); - conf.set("mapred.child.java.opts", "-Xmx1G"); - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - } - - public static void setInputInfo(JobContext job, String user, byte[] passwd) { - Configuration conf = job.getConfiguration(); - if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Input info can only be set once per job"); - conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(user, passwd); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - } - - public static void setEndTime(JobContext job, String endTime) { - Configuration conf = job.getConfiguration(); - conf.set(ENDTIME, endTime); - } - - public static String getEndTime(Configuration conf) { - return conf.get(ENDTIME); - } - - public static void setNumThreads(JobContext job, int numThreads) { - Configuration conf = job.getConfiguration(); - conf.setInt(MR_NUMTHREADS_PROP, numThreads); - } - - public static int getNumThreads(Configuration conf) { - return conf.getInt(MR_NUMTHREADS_PROP, -1); - } - - public static void setTable(JobContext job, String table) { - Configuration conf = job.getConfiguration(); - conf.set(TABLE, table); - } - - public static String getTable(Configuration conf) { - return conf.get(TABLE); - } - - public static void setShardTable(JobContext job, String table) { - Configuration conf = job.getConfiguration(); - conf.set(SHARD_TABLE, table); - } - - public static String getShardTable(Configuration conf) { - String t = conf.get(SHARD_TABLE); - return (t != null) ? t : getTable(conf); - } - - public static void setStartTime(JobContext job, String startTime) { - Configuration conf = job.getConfiguration(); - conf.set(STARTTIME, startTime); - } - - public static String getStartTime(Configuration conf) { - return conf.get(STARTTIME); - } - - public static void setZooKeeperInstance(JobContext job, String instanceName, String zk) { - Configuration conf = job.getConfiguration(); - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(instanceName, zk); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZK, zk); - } - - protected static void validateOptions(Configuration conf) throws IOException { - if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) - throw new IOException("Input info has not been set."); - if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IOException("Instance info has not been set."); - if (conf.getStrings(SPARQL_QUERIES_PROP) == null) - throw new IOException("Sparql queries have not been set."); - } - - private class SparqlResultsRecordReader extends RecordReader<LongWritable, MapWritable> -// implements TupleQueryResultWriter, Runnable - { - - boolean closed = false; - long count = 0; - BlockingQueue<MapWritable> queue = new LinkedBlockingQueue<MapWritable>(); - private Repository repo; - String query; - - Configuration conf; - private TupleQueryResult result; - private RepositoryConnection conn; - - public SparqlResultsRecordReader(Configuration conf) { - this.conf = conf; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - - try { - validateOptions(conf); - - SparqlInputSplit sis = (SparqlInputSplit) inputSplit; - this.query = sis.sparql; - - // init RdfCloudTripleStore - final PartitionSail store = new PartitionSail(new ZooKeeperInstance(getInstance(conf), - conf.get(ZK)).getConnector(getUsername(conf), getPassword(conf).getBytes()), getTable(conf), getShardTable(conf)); - - repo = new SailRepository(store); - repo.initialize(); - - conn = repo.getConnection(); - query = URLDecoder.decode(query, UTF_8); - TupleQuery tupleQuery = conn.prepareTupleQuery( - QueryLanguage.SPARQL, query); - - if (sis.startTime != null && sis.endTime != null) { - tupleQuery.setBinding(START_BINDING, vf.createLiteral(sis.startTime)); - tupleQuery.setBinding(END_BINDING, vf.createLiteral(sis.endTime)); - } - - int threads = getNumThreads(conf); - if (threads > 0) { - tupleQuery.setBinding(NUMTHREADS_PROP, vf.createLiteral(threads)); - } - - result = tupleQuery.evaluate(); - } catch (Exception e) { - throw new IOException("Exception occurred opening Repository", e); - } - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - try { - return result.hasNext(); - } catch (QueryEvaluationException e) { - throw new IOException(e); - } -// return false; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(count++); - } - - @Override - public MapWritable getCurrentValue() throws IOException, InterruptedException { - try { - if (result.hasNext()) { - BindingSet bindingSet = result.next(); - return transformRow(bindingSet); - } - return null; - } catch (QueryEvaluationException e) { - throw new IOException(e); - } - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return (closed) ? (1) : (0); - } - - @Override - public void close() throws IOException { - closed = true; - try { - conn.close(); - repo.shutDown(); - } catch (RepositoryException e) { - throw new IOException("Exception occurred closing Repository", e); - } - } - - MapWritable mw = new MapWritable(); - - protected MapWritable transformRow(BindingSet bindingSet) { - mw.clear(); //handle the case of optional bindings. -mbraun - for (String name : bindingSet.getBindingNames()) { - final Text key = new Text(name); - final Text value = new Text(bindingSet.getValue(name).stringValue()); - mw.put(key, value); - } - return mw; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java deleted file mode 100644 index 4b369ae..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java +++ /dev/null @@ -1,155 +0,0 @@ -package mvm.mmrts.rdf.partition.mr; - -import com.google.common.io.ByteStreams; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Date; - -/** - * Class SparqlTestDriver - * Date: Oct 28, 2010 - * Time: 2:53:39 PM - */ -public class SparqlTestDriver implements Tool { - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new SparqlTestDriver(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private Configuration conf; - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public int run(String[] args) throws IOException, InterruptedException, - ClassNotFoundException { - - //query from file - if(args.length < 2) { - throw new IllegalArgumentException("Usage: hadoop jar mvm.mmrts.rdf.partition.mr.SparqlTestDriver <local query file> outputFile"); - } - - FileInputStream fis = new FileInputStream(args[0]); - String query = new String(ByteStreams.toByteArray(fis)); - fis.close(); - - Job job = new Job(conf); - job.setJarByClass(SparqlTestDriver.class); - - // set up cloudbase input - job.setInputFormatClass(SparqlPartitionStoreInputFormat.class); - SparqlPartitionStoreInputFormat.setInputInfo(job, "root", "password".getBytes()); - SparqlPartitionStoreInputFormat.setZooKeeperInstance(job, "stratus", "10.40.190.113:2181"); - SparqlPartitionStoreInputFormat.setLongJob(job, null); - SparqlPartitionStoreInputFormat.setTable(job, "partitionRdf"); - - long startTime_l = 1303811164088l; - long ttl = 86400000; - - //set query -// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" + -// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + -// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + -// "SELECT * WHERE\n" + -// "{\n" + -// "?id tdp:reportedAt ?timestamp. \n" + -// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314380456900 , 1314384056900 , 'XMLDATETIME')).\n" + -// "?id tdp:performedBy ?system.\n" + -// "} \n"; -// -// String query2 = "PREFIX hb: <http://here/2010/tracked-data-provenance/heartbeat/ns#>\n" + -// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" + -// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" + -// "SELECT * WHERE\n" + -// "{\n" + -// "?id hb:timeStamp ?timestamp. \n" + -// "FILTER(mvmpart:timeRange(?id, hb:timeStamp, 1314360009522 , 1314367209522 , 'TIMESTAMP')).\n" + -// "?id hb:count ?count.\n" + -// "?id hb:systemName ?systemName.\n" + -// "} "; - - System.out.println(query); - System.out.println(); -// System.out.println(query2); - - SparqlPartitionStoreInputFormat.setSparqlQueries(job, query); -// SparqlCloudbaseStoreInputFormat.setStartTime(job, 1309956861000l + ""); -// SparqlCloudbaseStoreInputFormat.setTtl(job, 86400000 + ""); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - //job.setOutputFormatClass(FileOutputFormat.class); - - - // set mapper and reducer classes - job.setMapperClass(MyTempMapper.class); - job.setReducerClass(Reducer.class); - job.setNumReduceTasks(1); - - // set output - Path outputDir = new Path(args[1]); - FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); - if (dfs.exists(outputDir)) - dfs.delete(outputDir, true); - - FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return (int) job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - public static class MyTempMapper extends Mapper<LongWritable, MapWritable, Text, Text> { - Text outKey = new Text(); - Text outValue = new Text("partition"); - @Override - protected void map(LongWritable key, MapWritable value, Context context) throws IOException, InterruptedException { - outKey.set(value.values().toString()); - context.write(outKey, outValue); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java deleted file mode 100644 index 80255ba..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java +++ /dev/null @@ -1,154 +0,0 @@ -package mvm.mmrts.rdf.partition.mr; - -import com.google.common.io.ByteStreams; -import com.google.common.primitives.Bytes; -import mvm.mmrts.rdf.partition.PartitionConstants; -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; - -import java.io.IOException; -import java.util.Date; - -/** - * Class SparqlTestDriver - * Date: Oct 28, 2010 - * Time: 2:53:39 PM - */ -public class TestDriver implements Tool { - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new TestDriver(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private Configuration conf; - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public int run(String[] args) throws IOException, InterruptedException, - ClassNotFoundException { - - Job job = new Job(conf); - job.setJarByClass(TestDriver.class); - - FileInputFormat.addInputPaths(job, "/temp/rpunnoose/results.txt"); - job.setInputFormatClass(TextInputFormat.class); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(MapWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - job.setOutputFormatClass(TextOutputFormat.class); - - // set mapper and reducer classes - job.setMapperClass(SubjectMapWrMapper.class); - job.setReducerClass(OutMapWrReducer.class); - job.setNumReduceTasks(1); -// job.setNumReduceTasks(0); - - // set output - Path outputDir = new Path("/temp/rpunnoose/partBS"); - FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); - if (dfs.exists(outputDir)) - dfs.delete(outputDir, true); - - FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return (int) job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - public static class SubjectMapWrMapper extends Mapper<LongWritable, Text, Text, MapWritable> { - Text outKey = new Text(); - final String ID = "id"; - final Text ID_TXT = new Text(ID); - final String PERF_AT = "performedBy"; - final Text PERF_AT_TXT = new Text("system"); - final String REPORT_AT = "reportedAt"; - final Text REPORT_AT_TXT = new Text("timestamp"); - final String TYPE = "type"; - final Text TYPE_TXT = new Text(TYPE); - - @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - String s = value.toString(); - int i = s.lastIndexOf("\0"); - Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(s.substring(0, i).getBytes()), PartitionConstants.VALUE_FACTORY); - String predStr = stmt.getPredicate().stringValue(); - if (!predStr.contains(PERF_AT) && !predStr.contains(REPORT_AT) && !predStr.contains(TYPE)) - return; - - outKey.set(stmt.getSubject().stringValue()); - MapWritable mw = new MapWritable(); - mw.put(ID_TXT, outKey); - if (predStr.contains(PERF_AT)) - mw.put(PERF_AT_TXT, new Text(stmt.getObject().stringValue())); - else if (predStr.contains(REPORT_AT)) - mw.put(REPORT_AT_TXT, new Text(stmt.getObject().stringValue())); - else if (predStr.contains(TYPE)) - mw.put(TYPE_TXT, new Text(stmt.getObject().stringValue())); - - context.write(outKey, mw); - } - } - - public static class OutMapWrReducer extends Reducer<Text, MapWritable, Text, Text> { - final Text PART = new Text("partitionBS"); - Text outKey = new Text(); - - @Override - protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { - MapWritable mw = new MapWritable(); - for (MapWritable value : values) { - mw.putAll(value); - } - outKey.set(mw.values().toString()); - context.write(outKey, PART); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java deleted file mode 100644 index 2b4565f..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java +++ /dev/null @@ -1,229 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.compat; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.mapreduce.CloudbaseInputFormat; -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.security.ColumnVisibility; -import mvm.mmrts.rdf.partition.PartitionConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table - * Class MoveShardIndexTool - * Date: Dec 8, 2011 - * Time: 4:11:40 PM - */ -public class ChangeShardDateFormatTool implements Tool { - public static final String CB_USERNAME_PROP = "cb.username"; - public static final String CB_PWD_PROP = "cb.pwd"; - public static final String CB_ZK_PROP = "cb.zk"; - public static final String CB_INSTANCE_PROP = "cb.instance"; - public static final String PARTITION_TABLE_PROP = "partition.table"; - public static final String OLD_DATE_FORMAT_PROP = "date.format.old"; - public static final String NEW_DATE_FORMAT_PROP = "date.format.new"; - public static final String OLD_DATE_SHARD_DELIM = "date.shard.delim.old"; - public static final String NEW_DATE_SHARD_DELIM = "date.shard.delim.new"; - - - private Configuration conf; - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.113:2181"; - private String partitionTable = "rdfPartition"; - private String oldDateFormat = "yyyy-MM"; - private String newDateFormat = "yyyyMMdd"; - private String oldDateDelim = "-"; - private String newDateDelim = "_"; - - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new ChangeShardDateFormatTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public long runJob(String[] args) throws Exception { - //faster - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - - zk = conf.get(CB_ZK_PROP, zk); - instance = conf.get(CB_INSTANCE_PROP, instance); - userName = conf.get(CB_USERNAME_PROP, userName); - pwd = conf.get(CB_PWD_PROP, pwd); - partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable); - oldDateFormat = conf.get(OLD_DATE_FORMAT_PROP, oldDateFormat); - newDateFormat = conf.get(NEW_DATE_FORMAT_PROP, newDateFormat); - oldDateDelim = conf.get(OLD_DATE_SHARD_DELIM, oldDateDelim); - newDateDelim = conf.get(NEW_DATE_SHARD_DELIM, newDateDelim); - conf.set(NEW_DATE_FORMAT_PROP, newDateFormat); - conf.set(OLD_DATE_FORMAT_PROP, oldDateFormat); - conf.set(PARTITION_TABLE_PROP, partitionTable); - conf.set(OLD_DATE_SHARD_DELIM, oldDateDelim); - conf.set(NEW_DATE_SHARD_DELIM, newDateDelim); - - Job job = new Job(conf); - job.setJarByClass(ChangeShardDateFormatTool.class); - - job.setInputFormatClass(CloudbaseInputFormat.class); - //TODO: How should I send in Auths? - CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), - partitionTable, CBConstants.NO_AUTHS); - CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); - - job.setMapperClass(ChangeDateFormatMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - job.setOutputFormatClass(CloudbaseOutputFormat.class); - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, partitionTable); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); - - job.setNumReduceTasks(0); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - public static class ChangeDateFormatMapper extends Mapper<Key, Value, Text, Mutation> { - private SimpleDateFormat oldDateFormat_df; - private SimpleDateFormat newDateFormat_df; - private Text partTableTxt; - private String newDateDelim; - private String oldDateDelim; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - String oldDateFormat = context.getConfiguration().get(OLD_DATE_FORMAT_PROP); - if (oldDateFormat == null) - throw new IllegalArgumentException("Old Date Format property cannot be null"); - - oldDateFormat_df = new SimpleDateFormat(oldDateFormat); - - String newDateFormat = context.getConfiguration().get(NEW_DATE_FORMAT_PROP); - if (newDateFormat == null) - throw new IllegalArgumentException("New Date Format property cannot be null"); - - newDateFormat_df = new SimpleDateFormat(newDateFormat); - - String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP); - if (partTable == null) - throw new IllegalArgumentException("Partition Table property cannot be null"); - - partTableTxt = new Text(partTable); - - oldDateDelim = context.getConfiguration().get(OLD_DATE_SHARD_DELIM); - if (oldDateDelim == null) - throw new IllegalArgumentException("Old Date Shard Delimiter property cannot be null"); - - newDateDelim = context.getConfiguration().get(NEW_DATE_SHARD_DELIM); - if (newDateDelim == null) - throw new IllegalArgumentException("New Date Shard Delimiter property cannot be null"); - - } - - @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - try { - String cf = key.getColumnFamily().toString(); - if ("event".equals(cf) || "index".equals(cf)) { - String shard = key.getRow().toString(); - int shardIndex = shard.lastIndexOf(oldDateDelim); - if (shardIndex == -1) - return; //no shard? - String date_s = shard.substring(0, shardIndex); - String shardValue = shard.substring(shardIndex + 1, shard.length()); - - Date date = oldDateFormat_df.parse(date_s); - String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue; - - Mutation mutation = new Mutation(new Text(newShard)); - mutation.put(key.getColumnFamily(), key.getColumnQualifier(), - new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); - context.write(partTableTxt, mutation); - - //delete - mutation = new Mutation(key.getRow()); - mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); - - context.write(partTableTxt, mutation); - } else { - //shard index - String shard = key.getColumnFamily().toString(); - int shardIndex = shard.lastIndexOf(oldDateDelim); - if (shardIndex == -1) - return; //no shard? - - String date_s = shard.substring(0, shardIndex); - String shardValue = shard.substring(shardIndex + 1, shard.length()); - - Date date = oldDateFormat_df.parse(date_s); - String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue; - - Mutation mutation = new Mutation(key.getRow()); - mutation.put(new Text(newShard), key.getColumnQualifier(), - new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); - - //delete - mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); - context.write(partTableTxt, mutation); - } - } catch (ParseException pe) { - //only do work for the rows that match the old date format - //throw new IOException(pe); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java deleted file mode 100644 index ba2eece..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java +++ /dev/null @@ -1,171 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.compat; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.mapreduce.CloudbaseInputFormat; -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.security.ColumnVisibility; -import mvm.mmrts.rdf.partition.PartitionConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import java.io.IOException; -import java.util.Collections; -import java.util.Date; - -/** - * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table - * Class MoveShardIndexTool - * Date: Dec 8, 2011 - * Time: 4:11:40 PM - */ -public class MoveShardIndexTool implements Tool { - public static final String CB_USERNAME_PROP = "cb.username"; - public static final String CB_PWD_PROP = "cb.pwd"; - public static final String CB_ZK_PROP = "cb.zk"; - public static final String CB_INSTANCE_PROP = "cb.instance"; - public static final String PARTITION_TABLE_PROP = "partition.table"; - public static final String SHARD_INDEX_TABLE_PROP = "shard.index.table"; - public static final String SHARD_INDEX_DELETE_PROP = "shard.index.delete"; - - - private Configuration conf; - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.113:2181"; - private String partitionTable = "rdfPartition"; - private String shardIndexTable = "rdfShardIndex"; - - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new MoveShardIndexTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public long runJob(String[] args) throws Exception { - //faster - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - - zk = conf.get(CB_ZK_PROP, zk); - instance = conf.get(CB_INSTANCE_PROP, instance); - userName = conf.get(CB_USERNAME_PROP, userName); - pwd = conf.get(CB_PWD_PROP, pwd); - partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable); - shardIndexTable = conf.get(SHARD_INDEX_TABLE_PROP, shardIndexTable); - conf.set(SHARD_INDEX_TABLE_PROP, shardIndexTable); - conf.set(PARTITION_TABLE_PROP, partitionTable); - - Job job = new Job(conf); - job.setJarByClass(MoveShardIndexTool.class); - - job.setInputFormatClass(CloudbaseInputFormat.class); - //TODO: How should I send in Auths? - CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), - partitionTable, CBConstants.NO_AUTHS); - CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); - CloudbaseInputFormat.setRanges(job, Collections.singleton( - new Range( - new Text(PartitionConstants.URI_MARKER_STR), - new Text(PartitionConstants.PLAIN_LITERAL_MARKER_STR)))); - - job.setMapperClass(ShardKeyValueToMutationMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - job.setOutputFormatClass(CloudbaseOutputFormat.class); - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, shardIndexTable); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); - - job.setNumReduceTasks(0); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - public static class ShardKeyValueToMutationMapper extends Mapper<Key, Value, Text, Mutation> { - private Text shardTableTxt; - private Text partTableTxt; - protected boolean deletePrevShardIndex; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - String shardTable = context.getConfiguration().get(SHARD_INDEX_TABLE_PROP); - if (shardTable == null) - throw new IllegalArgumentException("Shard Table property cannot be null"); - - shardTableTxt = new Text(shardTable); - - String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP); - if (partTable == null) - throw new IllegalArgumentException("Partition Table property cannot be null"); - - partTableTxt = new Text(partTable); - - deletePrevShardIndex = context.getConfiguration().getBoolean(SHARD_INDEX_DELETE_PROP, false); - System.out.println("Deleting shard index from previous: " + deletePrevShardIndex + " Part: " + partTableTxt); - } - - @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - Mutation mutation = new Mutation(key.getRow()); - mutation.put(key.getColumnFamily(), key.getColumnQualifier(), - new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value); - - context.write(shardTableTxt, mutation); - - if (deletePrevShardIndex) { - mutation = new Mutation(key.getRow()); - mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis()); - - context.write(partTableTxt, mutation); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java deleted file mode 100644 index b347a56..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java +++ /dev/null @@ -1,155 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput; - -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.openrdf.model.Statement; -import org.openrdf.rio.*; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Be able to input multiple rdf formatted files. Convert from rdf format to statements. - * Class RdfFileInputFormat - * Date: May 16, 2011 - * Time: 2:11:24 PM - */ -public class RdfFileInputFormat extends FileInputFormat<LongWritable, BytesWritable> { - - public static final String RDF_FILE_FORMAT = "mvm.mmrts.rdf.cloudbase.sail.mr.fileinput.rdfformat"; - - @Override - public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new RdfFileRecordReader(); - } - - private class RdfFileRecordReader extends RecordReader<LongWritable, BytesWritable> implements RDFHandler { - - boolean closed = false; - long count = 0; - BlockingQueue<BytesWritable> queue = new LinkedBlockingQueue<BytesWritable>(); - int total = 0; - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) inputSplit; - Configuration conf = taskAttemptContext.getConfiguration(); - String rdfForm_s = conf.get(RDF_FILE_FORMAT, RDFFormat.RDFXML.getName()); - RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s); - - Path file = fileSplit.getPath(); - FileSystem fs = file.getFileSystem(conf); - FSDataInputStream fileIn = fs.open(fileSplit.getPath()); - - RDFParser rdfParser = Rio.createParser(rdfFormat); - rdfParser.setRDFHandler(this); - try { - rdfParser.parse(fileIn, ""); - } catch (Exception e) { - throw new IOException(e); - } - fileIn.close(); - total = queue.size(); - //TODO: Make this threaded so that you don't hold too many statements before sending them - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return queue.size() > 0; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(count++); - } - - @Override - public BytesWritable getCurrentValue() throws IOException, InterruptedException { - return queue.poll(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return ((float) (total - queue.size())) / ((float) total); - } - - @Override - public void close() throws IOException { - closed = true; - } - - @Override - public void startRDF() throws RDFHandlerException { - } - - @Override - public void endRDF() throws RDFHandlerException { - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { - byte[] stmt_bytes = RdfIO.writeStatement(statement, true); - queue.add(new BytesWritable(stmt_bytes)); - } catch (IOException e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - } - } -// -// public static RDFParser createRdfParser(RDFFormat rdfFormat) { -// if (RDFFormat.RDFXML.equals(rdfFormat)) { -// return new RDFXMLParserFactory().getParser(); -// } else if (RDFFormat.N3.equals(rdfFormat)) { -// return new N3ParserFactory().getParser(); -// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) { -// return new NTriplesParserFactory().getParser(); -// } else if (RDFFormat.TRIG.equals(rdfFormat)) { -// return new TriGParserFactory().getParser(); -// } else if (RDFFormat.TRIX.equals(rdfFormat)) { -// return new TriXParserFactory().getParser(); -// } else if (RDFFormat.TURTLE.equals(rdfFormat)) { -// return new TurtleParserFactory().getParser(); -// } -// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]"); -// } -// -// public static RDFWriter createRdfWriter(RDFFormat rdfFormat, OutputStream os) { -// if (RDFFormat.RDFXML.equals(rdfFormat)) { -// return new RDFXMLWriterFactory().getWriter(os); -// } else if (RDFFormat.N3.equals(rdfFormat)) { -// return new N3WriterFactory().getWriter(os); -// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) { -// return new NTriplesWriterFactory().getWriter(os); -// } else if (RDFFormat.TRIG.equals(rdfFormat)) { -// return new TriGWriterFactory().getWriter(os); -// } else if (RDFFormat.TRIX.equals(rdfFormat)) { -// return new TriXWriterFactory().getWriter(os); -// } else if (RDFFormat.TURTLE.equals(rdfFormat)) { -// return new TurtleWriterFactory().getWriter(os); -// } -// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]"); -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java deleted file mode 100644 index 12c1a4e..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java +++ /dev/null @@ -1,210 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput; - -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Mutation; -import com.google.common.io.ByteStreams; -import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator; -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.io.IOException; -import java.util.Date; - -import static mvm.mmrts.rdf.partition.PartitionConstants.*; -import static mvm.mmrts.rdf.partition.PartitionConstants.EMPTY_VALUE; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; - -/** - * Do bulk import of rdf files - * Class RdfFileInputToCloudbaseTool - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputToCloudbaseTool implements Tool { - - public static final String CB_USERNAME_PROP = "cb.username"; - public static final String CB_PWD_PROP = "cb.pwd"; - public static final String CB_SERVER_PROP = "cb.server"; - public static final String CB_PORT_PROP = "cb.port"; - public static final String CB_INSTANCE_PROP = "cb.instance"; - public static final String CB_TTL_PROP = "cb.ttl"; - public static final String CB_TABLE_PROP = "cb.table"; - - - private Configuration conf; - - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String server = "10.40.190.113"; - private String port = "2181"; - private String table = "partitionRdf"; - - - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputToCloudbaseTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { - //faster - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - - server = conf.get(CB_SERVER_PROP, server); - port = conf.get(CB_PORT_PROP, port); - instance = conf.get(CB_INSTANCE_PROP, instance); - userName = conf.get(CB_USERNAME_PROP, userName); - pwd = conf.get(CB_PWD_PROP, pwd); - table = conf.get(CB_TABLE_PROP, table); - conf.set(CB_TABLE_PROP, table); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputToCloudbaseTool.class); - - // set up cloudbase input - job.setInputFormatClass(RdfFileInputFormat.class); - RdfFileInputFormat.addInputPath(job, new Path(args[0])); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(BytesWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - - job.setOutputFormatClass(CloudbaseOutputFormat.class); - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, table); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, server + ":" + port); - - // set mapper and reducer classes - job.setMapperClass(OutSubjStmtMapper.class); - job.setReducerClass(StatementToMutationReducer.class); - - // set output -// Path outputDir = new Path("/temp/sparql-out/testout"); -// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); -// if (dfs.exists(outputDir)) -// dfs.delete(outputDir, true); -// -// FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public static class OutSubjStmtMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> { - - public OutSubjStmtMapper() { - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - } - - @Override - protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { - Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance()); - context.write(new Text(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR), value); - } - - } - - public static class StatementToMutationReducer extends Reducer<Text, BytesWritable, Text, Mutation> { - private Text outputTable; - private DateHashModShardValueGenerator gen; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - outputTable = new Text(context.getConfiguration().get(CB_TABLE_PROP, null)); - gen = new DateHashModShardValueGenerator(); - } - - @Override - protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { - Resource subject = (Resource) RdfIO.readValue(ByteStreams.newDataInput(key.getBytes()), ValueFactoryImpl.getInstance(), FAMILY_DELIM); - byte[] subj_bytes = writeValue(subject); - String shard = gen.generateShardValue(subject); - Text shard_txt = new Text(shard); - - /** - * Triple - > - *- < subject ><shard >: - *- < shard > event:<subject >\0 < predicate >\0 < object >\0 - *- < shard > index:<predicate >\1 < object >\0 - */ - Mutation m_subj = new Mutation(shard_txt); - for (BytesWritable stmt_bytes : values) { - Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(stmt_bytes.getBytes()), ValueFactoryImpl.getInstance()); - m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE); - m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE); - } - - /** - * TODO: Is this right? - * If the subject does not have any authorizations specified, then anyone can access it. - * But the true authorization check will happen at the predicate/object level, which means that - * the set returned will only be what the person is authorized to see. The shard lookup table has to - * have the lowest level authorization all the predicate/object authorizations; otherwise, - * a user may not be able to see the correct document. - */ - Mutation m_shard = new Mutation(new Text(subj_bytes)); - m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE); - - context.write(outputTable, m_subj); - context.write(outputTable, m_shard); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java ---------------------------------------------------------------------- diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java deleted file mode 100644 index e677d12..0000000 --- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java +++ /dev/null @@ -1,159 +0,0 @@ -package mvm.mmrts.rdf.partition.mr.fileinput; - -import com.google.common.io.ByteStreams; -import mvm.mmrts.rdf.partition.utils.RdfIO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.io.IOException; -import java.util.Date; - -import static mvm.mmrts.rdf.partition.PartitionConstants.FAMILY_DELIM_STR; -import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue; - -/** - * Do bulk import of rdf files - * Class RdfFileInputToCloudbaseTool - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputToFileTool implements Tool { - - private Configuration conf; - - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputToFileTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { - if (args.length < 2) - throw new IllegalArgumentException("Usage: RdfFileInputToFileTool <input directory> <output directory>"); - - //faster - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputToFileTool.class); - - // set up cloudbase input - job.setInputFormatClass(RdfFileInputFormat.class); - RdfFileInputFormat.addInputPath(job, new Path(args[0])); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); - - - // set mapper and reducer classes - job.setMapperClass(StmtToBytesMapper.class); - job.setReducerClass(StmtBytesReducer.class); - - // set output - job.setOutputFormatClass(TextOutputFormat.class); - Path outputDir = new Path(args[1]); - FileSystem dfs = FileSystem.get(outputDir.toUri(), conf); - if (dfs.exists(outputDir)) - dfs.delete(outputDir, true); - - FileOutputFormat.setOutputPath(job, outputDir); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public static class StmtToBytesMapper extends Mapper<LongWritable, BytesWritable, Text, Text> { - - Text outKey = new Text(); - Text outValue = new Text(); - - public StmtToBytesMapper() { - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - } - - @Override - protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { - Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance()); - outKey.set(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR); - outValue.set(value.getBytes()); - context.write(outKey, outValue); - } - - } - - public static class StmtBytesReducer extends Reducer<Text, Text, NullWritable, Text> { - - NullWritable outKey = NullWritable.get(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - } - - @Override - protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - for (Text stmt_txt : values) { - context.write(outKey, stmt_txt); - } - } - } -} -
