http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java deleted file mode 100644 index c3ddcfd..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java +++ /dev/null @@ -1,369 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.accumulo.AccumuloRdfUtils.extractValue; -import static mvm.rya.accumulo.AccumuloRdfUtils.from; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRdfConstants; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolver; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; -import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner; -import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.rio.ParserConfig; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandler; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParseException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.nquads.NQuadsParser; - -import com.google.common.base.Preconditions; - -/** - * Take large ntrips files and use MapReduce and Cloudbase - * Bulk ingest techniques to load into the table in our partition format. - * <p/> - * Input: NTrips file - * Map: - * - key : shard row - Text - * - value : stmt in doc triple format - Text - * Partitioner: RangePartitioner - * Reduce: - * - key : all the entries for each triple - Cloudbase Key - * Class BulkNtripsInputTool - * Date: Sep 13, 2011 - * Time: 10:00:17 AM - */ -public class BulkNtripsInputTool extends Configured implements Tool { - - public static final String WORKDIR_PROP = "bulk.n3.workdir"; - - private String userName = "root"; - private String pwd = "root"; - private String instance = "isntance"; - private String zk = "zoo"; - private String ttl = null; - private String workDirBase = "/temp/bulkcb/work"; - private String format = RDFFormat.NQUADS.getName(); - - @Override - public int run(final String[] args) throws Exception { - final Configuration conf = getConf(); - try { - //conf - zk = conf.get(MRUtils.AC_ZK_PROP, zk); - ttl = conf.get(MRUtils.AC_TTL_PROP, ttl); - instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); - workDirBase = conf.get(WORKDIR_PROP, workDirBase); - format = conf.get(MRUtils.FORMAT_PROP, format); - conf.set(MRUtils.FORMAT_PROP, format); - final String inputDir = args[0]; - - ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk); - Connector connector = zooKeeperInstance.getConnector(userName, new PasswordToken(pwd)); - TableOperations tableOperations = connector.tableOperations(); - - if (conf.get(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS) != null ) { - throw new IllegalArgumentException("Cannot use Bulk N Trips tool with Additional Indexers"); - } - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (tablePrefix != null) - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, - tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, - tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX}; - Collection<Job> jobs = new ArrayList<Job>(); - for (final String tableName : tables) { - PrintStream out = null; - try { - String workDir = workDirBase + "/" + tableName; - System.out.println("Loading data into table[" + tableName + "]"); - - Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]"); - job.setJarByClass(this.getClass()); - //setting long job - Configuration jobConf = job.getConfiguration(); - jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); - jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256")); - jobConf.setBoolean("mapred.compress.map.output", true); -// jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(ParseNtripsMapper.class); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - - job.setCombinerClass(OutStmtMutationsReducer.class); - job.setReducerClass(OutStmtMutationsReducer.class); - job.setOutputFormatClass(AccumuloFileOutputFormat.class); - // AccumuloFileOutputFormat.setZooKeeperInstance(jobConf, instance, zk); - - jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName); - - TextInputFormat.setInputPaths(job, new Path(inputDir)); - - FileSystem fs = FileSystem.get(conf); - Path workPath = new Path(workDir); - if (fs.exists(workPath)) - fs.delete(workPath, true); - - //make failures dir - Path failures = new Path(workDir, "failures"); - fs.delete(failures, true); - fs.mkdirs(new Path(workDir, "failures")); - - AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); - - out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt")))); - - if (!tableOperations.exists(tableName)) - tableOperations.create(tableName); - Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE); - for (Text split : splits) - out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); - - job.setNumReduceTasks(splits.size() + 1); - out.close(); - - job.setPartitionerClass(KeyRangePartitioner.class); - RangePartitioner.setSplitFile(job, workDir + "/splits.txt"); - - jobConf.set(WORKDIR_PROP, workDir); - - job.submit(); - jobs.add(job); - - } catch (Exception re) { - throw new RuntimeException(re); - } finally { - if (out != null) - out.close(); - } - } - - for (Job job : jobs) { - while (!job.isComplete()) { - Thread.sleep(1000); - } - } - - for(String tableName : tables) { - String workDir = workDirBase + "/" + tableName; - String filesDir = workDir + "/files"; - String failuresDir = workDir + "/failures"; - - FileSystem fs = FileSystem.get(conf); - - //make sure that the "accumulo" user can read/write/execute into these directories this path - fs.setPermission(new Path(filesDir), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - fs.setPermission(new Path(failuresDir), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - - tableOperations.importDirectory( - tableName, - filesDir, - failuresDir, - false); - - } - - } catch (Exception e ){ - throw new RuntimeException(e); - } - - return 0; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args); - } - - /** - * input: ntrips format triple - * <p/> - * output: key: shard row from generator - * value: stmt in serialized format (document format) - */ - public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> { - public static final String TABLE_PROPERTY = "parsentripsmapper.table"; - - private RDFParser parser; - private String rdfFormat; - private String namedGraph; - private RyaTripleContext ryaContext; - private TripleRowResolver rowResolver; - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - final String table = conf.get(TABLE_PROPERTY); - Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job"); - this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); - rowResolver = ryaContext.getTripleResolver(); - - final String cv_s = conf.get(MRUtils.AC_CV_PROP); - final byte[] cv = cv_s == null ? null : cv_s.getBytes(); - rdfFormat = conf.get(MRUtils.FORMAT_PROP); - checkNotNull(rdfFormat, "Rdf format cannot be null"); - - namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP); - - parser = new NQuadsParser(); - parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY)); - parser.setRDFHandler(new RDFHandler() { - - @Override - public void startRDF() throws RDFHandlerException { - - } - - @Override - public void endRDF() throws RDFHandlerException { - - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { - RyaStatement rs = RdfToRyaConversions.convertStatement(statement); - if(rs.getColumnVisibility() == null) { - rs.setColumnVisibility(cv); - } - - // Inject the specified context into the statement. - if(namedGraph != null){ - rs.setContext(new RyaURI(namedGraph)); - } else if (statement.getContext() != null) { - rs.setContext(new RyaURI(statement.getContext().toString())); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs); - - if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) { - TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) { - TripleRow tripleRow = serialize.get(TABLE_LAYOUT.PO); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) { - TripleRow tripleRow = serialize.get(TABLE_LAYOUT.OSP); - context.write( - from(tripleRow), - extractValue(tripleRow) - ); - } else - throw new IllegalArgumentException("Unrecognized table[" + table + "]"); - - } catch (Exception e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - - } - }); - } - - @Override - public void map(LongWritable key, Text value, Context output) - throws IOException, InterruptedException { - String rdf = value.toString(); - try { - parser.parse(new StringReader(rdf), ""); - } catch (RDFParseException e) { - System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]"); - } catch (Exception e) { - e.printStackTrace(); - throw new IOException("Exception occurred parsing triple[" + rdf + "]"); - } - } - } - - public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> { - - public void reduce(Key key, Iterable<Value> values, Context output) - throws IOException, InterruptedException { - output.write(key, AccumuloRdfConstants.EMPTY_VALUE); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java deleted file mode 100644 index 5a872a0..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java +++ /dev/null @@ -1,251 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.io.StringReader; -import java.util.Collection; -import java.util.Date; -import java.util.Map; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRdfConstants; -import mvm.rya.accumulo.RyaTableMutationsFactory; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaTripleContext; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.Statement; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandler; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.Rio; - -/** - * Do bulk import of rdf files - * Class RdfFileInputTool2 - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputByLineTool implements Tool { - - private Configuration conf = new Configuration(); - - private String userName = "root"; - private String pwd = "password"; - private String instance = "instance"; - private String zk = "zoo"; - private String tablePrefix = null; - private RDFFormat format = RDFFormat.NTRIPLES; - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException, AccumuloSecurityException { - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - conf.setLong("mapred.task.timeout", 600000000); - - zk = conf.get(MRUtils.AC_ZK_PROP, zk); - instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); - format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString())); - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputByLineTool.class); - - // set up cloudbase input - job.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd.getBytes())); - AccumuloOutputFormat.setCreateTables(job, true); - AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk); - - // set mapper and reducer classes - job.setMapperClass(TextToMutationMapper.class); - job.setNumReduceTasks(0); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - return (int) runJob(args); - } - - public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> { - protected RDFParser parser; - private String prefix; - private RDFFormat rdfFormat; - protected Text spo_table; - private Text po_table; - private Text osp_table; - private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression(); - - public TextToMutationMapper() { - } - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (prefix != null) { - RdfCloudTripleStoreConstants.prefixTables(prefix); - } - - spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - - final String cv_s = conf.get(MRUtils.AC_CV_PROP); - if (cv_s != null) - cv = cv_s.getBytes(); - - rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString())); - parser = Rio.createParser(rdfFormat); - RyaTripleContext tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); - final RyaTableMutationsFactory mut = new RyaTableMutationsFactory(tripleContext); - - parser.setRDFHandler(new RDFHandler() { - - @Override - public void startRDF() throws RDFHandlerException { - - } - - @Override - public void endRDF() throws RDFHandlerException { - - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - try { - RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); - if(ryaStatement.getColumnVisibility() == null) { - ryaStatement.setColumnVisibility(cv); - } - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = - mut.serialize(ryaStatement); - Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - - for (Mutation m : spo) { - context.write(spo_table, m); - } - for (Mutation m : po) { - context.write(po_table, m); - } - for (Mutation m : osp) { - context.write(osp_table, m); - } - } catch (Exception e) { - throw new RDFHandlerException(e); - } - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - - } - }); - } - - @Override - protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { - try { - parser.parse(new StringReader(value.toString()), ""); - } catch (Exception e) { - throw new IOException(e); - } - } - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java deleted file mode 100644 index f20dfe3..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java +++ /dev/null @@ -1,146 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaTripleContext; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.openrdf.model.Statement; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandler; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.Rio; - -/** - * Be able to input multiple rdf formatted files. Convert from rdf format to statements. - * Class RdfFileInputFormat - * Date: May 16, 2011 - * Time: 2:11:24 PM - */ -public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> { - - @Override - public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new RdfFileRecordReader(); - } - - private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler { - - boolean closed = false; - long count = 0; - BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>(); - int total = 0; - private RyaTripleContext tripleContext; - - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) inputSplit; - Configuration conf = taskAttemptContext.getConfiguration(); - String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); - RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s); - tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); - - Path file = fileSplit.getPath(); - FileSystem fs = file.getFileSystem(conf); - FSDataInputStream fileIn = fs.open(fileSplit.getPath()); - - RDFParser rdfParser = Rio.createParser(rdfFormat); - rdfParser.setRDFHandler(this); - try { - rdfParser.parse(fileIn, ""); - } catch (Exception e) { - throw new IOException(e); - } - fileIn.close(); - total = queue.size(); - //TODO: Make this threaded so that you don't hold too many statements before sending them - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return queue.size() > 0; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(count++); - } - - @Override - public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException { - return queue.poll(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return ((float) (total - queue.size())) / ((float) total); - } - - @Override - public void close() throws IOException { - closed = true; - } - - @Override - public void startRDF() throws RDFHandlerException { - } - - @Override - public void endRDF() throws RDFHandlerException { - } - - @Override - public void handleNamespace(String s, String s1) throws RDFHandlerException { - } - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), tripleContext)); - } - - @Override - public void handleComment(String s) throws RDFHandlerException { - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java deleted file mode 100644 index 673d65f..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java +++ /dev/null @@ -1,175 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; - -import java.io.IOException; -import java.util.Collection; -import java.util.Date; -import java.util.Map; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.RyaTableMutationsFactory; -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.api.resolver.RyaTripleContext; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.rio.RDFFormat; - -/** - * Do bulk import of rdf files - * Class RdfFileInputTool - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool { - - private String format = RDFFormat.RDFXML.getName(); - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException, AccumuloSecurityException { - conf.set(MRUtils.JOB_NAME_PROP, "Rdf File Input"); - //faster - init(); - format = conf.get(MRUtils.FORMAT_PROP, format); - conf.set(MRUtils.FORMAT_PROP, format); - - String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputTool.class); - - // set up cloudbase input - job.setInputFormatClass(RdfFileInputFormat.class); - RdfFileInputFormat.addInputPath(job, new Path(inputPath)); - - // set input output of the particular job - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(RyaStatementWritable.class); - - setupOutputFormat(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - - // set mapper and reducer classes - job.setMapperClass(StatementToMutationMapper.class); - job.setNumReduceTasks(0); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> { - protected String tablePrefix; - protected Text spo_table; - protected Text po_table; - protected Text osp_table; - private byte[] cv = EMPTY_CV.getExpression(); - RyaTableMutationsFactory mut; - - public StatementToMutationMapper() { - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - mut = new RyaTableMutationsFactory(RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf))); - tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - - final String cv_s = conf.get(MRUtils.AC_CV_PROP); - if (cv_s != null) - cv = cv_s.getBytes(); - } - - @Override - protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { - RyaStatement statement = value.getRyaStatement(); - if (statement.getColumnVisibility() == null) { - statement.setColumnVisibility(cv); - } - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = - mut.serialize(statement); - Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - - for (Mutation m : spo) { - context.write(spo_table, m); - } - for (Mutation m : po) { - context.write(po_table, m); - } - for (Mutation m : osp) { - context.write(osp_table, m); - } - } - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java deleted file mode 100644 index 89f0aa5..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java +++ /dev/null @@ -1,240 +0,0 @@ -package mvm.rya.accumulo.mr.upgrade; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.utils.MRUtils; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.calrissian.mango.types.LexiTypeEncoders; -import org.calrissian.mango.types.TypeEncoder; - -import java.io.IOException; -import java.util.Date; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.*; - -/** - */ -public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool { - @Override - public int run(String[] strings) throws Exception { - conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2"); - //faster - init(); - - Job job = new Job(conf); - job.setJarByClass(Upgrade322Tool.class); - - setupInputFormat(job); - AccumuloInputFormat.setInputTableName(job, tablePrefix + TBL_OSP_SUFFIX); - - //we do not need to change any row that is a string, custom, or uri type - IteratorSetting regex = new IteratorSetting(30, "regex", - RegExFilter.class); - RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false); - RegExFilter.setNegate(regex, true); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - setupOutputFormat(job, tablePrefix + - TBL_SPO_SUFFIX); - - // set mapper and reducer classes - job.setMapperClass(Upgrade322Mapper.class); - job.setReducerClass(Reducer.class); - - // Submit the job - return job.waitForCompletion(true) ? 0 : 1; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new Upgrade322Tool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * Reading from the OSP table - */ - public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> { - - private String tablePrefix; - private Text spoTable; - private Text poTable; - private Text ospTable; - - private final UpgradeObjectSerialization upgradeObjectSerialization; - - public Upgrade322Mapper() { - this(new UpgradeObjectSerialization()); - } - - public Upgrade322Mapper( - UpgradeObjectSerialization upgradeObjectSerialization) { - this.upgradeObjectSerialization = upgradeObjectSerialization; - } - - @Override - protected void setup( - Context context) throws IOException, InterruptedException { - super.setup(context); - - tablePrefix = context.getConfiguration().get( - MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF); - spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX); - poTable = new Text(tablePrefix + TBL_PO_SUFFIX); - ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX); - } - - @Override - protected void map( - Key key, Value value, Context context) - throws IOException, InterruptedException { - - //read the key, expect OSP - final String row = key.getRow().toString(); - final int firstDelim = row.indexOf(DELIM); - final int secondDelim = row.indexOf(DELIM, firstDelim + 1); - final int typeDelim = row.lastIndexOf(TYPE_DELIM); - final String oldSerialization = row.substring(0, firstDelim); - char typeMarker = row.charAt(row.length() - 1); - - final String subject = row.substring(firstDelim + 1, secondDelim); - final String predicate = row.substring(secondDelim + 1, typeDelim); - final String typeSuffix = TYPE_DELIM + typeMarker; - - String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker); - if(newSerialization == null) { - return; - } - - //write out delete Mutations - Mutation deleteOldSerialization_osp = new Mutation(key.getRow()); - deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix); - deleteOldSerialization_po.putDelete(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix); - deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - - //write out new serialization - Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix); - putNewSerialization_osp.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix); - putNewSerialization_po.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix); - putNewSerialization_spo.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - - //write out deletes to all tables - context.write(ospTable, deleteOldSerialization_osp); - context.write(poTable, deleteOldSerialization_po); - context.write(spoTable, deleteOldSerialization_spo); - - //write out inserts to all tables - context.write(ospTable, putNewSerialization_osp); - context.write(poTable, putNewSerialization_po); - context.write(spoTable, putNewSerialization_spo); - } - } - - public static class UpgradeObjectSerialization { - - public static final TypeEncoder<Boolean, String> - BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder(); - public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER - = LexiTypeEncoders.byteEncoder(); - public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER - = LexiTypeEncoders.dateEncoder(); - public static final TypeEncoder<Integer, String> - INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder(); - public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER - = LexiTypeEncoders.longEncoder(); - public static final TypeEncoder<Double, String> - DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder(); - - public String upgrade(String object, int typeMarker) { - switch(typeMarker) { - case 10: //boolean - final boolean bool = Boolean.parseBoolean(object); - return BOOLEAN_STRING_TYPE_ENCODER.encode(bool); - case 9: //byte - final byte b = Byte.parseByte(object); - return BYTE_STRING_TYPE_ENCODER.encode(b); - case 4: //long - final Long lng = Long.parseLong(object); - return LONG_STRING_TYPE_ENCODER.encode(lng); - case 5: //int - final Integer i = Integer.parseInt(object); - return INTEGER_STRING_TYPE_ENCODER.encode(i); - case 6: //double - String exp = object.substring(2, 5); - char valueSign = object.charAt(0); - char expSign = object.charAt(1); - Integer expInt = Integer.parseInt(exp); - if (expSign == '-') { - expInt = 999 - expInt; - } - final String expDoubleStr = - String.format("%s%sE%s%d", valueSign, - object.substring(6), - expSign, expInt); - return DOUBLE_STRING_TYPE_ENCODER - .encode(Double.parseDouble(expDoubleStr)); - case 7: //datetime - //check to see if it is an early release that includes the exact term xsd:dateTime - final Long l = Long.MAX_VALUE - Long.parseLong(object); - Date date = new Date(l); - return DATE_STRING_TYPE_ENCODER.encode(date); - default: - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java deleted file mode 100644 index c9dac6b..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java +++ /dev/null @@ -1,206 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.rfile.RFileOperations; -import org.apache.accumulo.core.util.ArgumentChecker; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; - -/** - * Finds the accumulo tablet files on the hdfs disk, and uses that as the input for MR jobs - * Date: 5/11/12 - * Time: 2:04 PM - */ -public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> { - - public static final Range ALLRANGE = new Range(new Text("\u0000"), new Text("\uFFFD")); - - @Override - public List<InputSplit> getSplits(JobContext jobContext) throws IOException { - //read the params from AccumuloInputFormat - Configuration conf = jobContext.getConfiguration(); - Instance instance = AccumuloProps.getInstance(jobContext); - String user = AccumuloProps.getUsername(jobContext); - AuthenticationToken password = AccumuloProps.getPassword(jobContext); - String table = AccumuloProps.getTablename(jobContext); - ArgumentChecker.notNull(instance); - ArgumentChecker.notNull(table); - - //find the files necessary - try { - AccumuloConfiguration acconf = instance.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - Connector connector = instance.getConnector(user, password); - TableOperations tos = connector.tableOperations(); - String tableId = tos.tableIdMap().get(table); - String filePrefix = acconf.get(Property.INSTANCE_DFS_DIR) + "/tables/" + tableId; - System.out.println(filePrefix); - - Scanner scanner = connector.createScanner("!METADATA", Constants.NO_AUTHS); //TODO: auths? - scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD"))); - scanner.fetchColumnFamily(new Text("file")); - List<String> files = new ArrayList<String>(); - List<InputSplit> fileSplits = new ArrayList<InputSplit>(); - Job job = new Job(conf); - for (Map.Entry<Key, Value> entry : scanner) { - String file = filePrefix + entry.getKey().getColumnQualifier().toString(); - files.add(file); - Path path = new Path(file); - FileStatus fileStatus = fs.getFileStatus(path); - long len = fileStatus.getLen(); - BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len); - fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts())); -// FileInputFormat.addInputPath(job, path); - } - System.out.println(files); - return fileSplits; -// return super.getSplits(job); - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new RecordReader<Key, Value>() { - - private FileSKVIterator fileSKVIterator; - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - FileSplit split = (FileSplit) inputSplit; - Configuration job = taskAttemptContext.getConfiguration(); - Path file = split.getPath(); -// long start = split.getStart(); -// long length = split.getLength(); - FileSystem fs = file.getFileSystem(job); -// FSDataInputStream fileIn = fs.open(file); -// System.out.println(start); -// if (start != 0L) { -// fileIn.seek(start); -// } - Instance instance = AccumuloProps.getInstance(taskAttemptContext); - - fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE, - new HashSet<ByteSequence>(), false, fs, job, instance.getConfiguration()); -// fileSKVIterator = new RFileOperations2().openReader(fileIn, length - start, job); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - fileSKVIterator.next(); - return fileSKVIterator.hasTop(); - } - - @Override - public Key getCurrentKey() throws IOException, InterruptedException { - return fileSKVIterator.getTopKey(); - } - - @Override - public Value getCurrentValue() throws IOException, InterruptedException { - return fileSKVIterator.getTopValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - - @Override - public void close() throws IOException { - //To change body of implemented methods use File | Settings | File Templates. - } - }; - } - - public static void main(String[] args) { - try { - Job job = new Job(new Configuration()); - job.setJarByClass(AccumuloHDFSFileInputFormat.class); - Configuration conf = job.getConfiguration(); - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken("secret")); - AccumuloInputFormat.setInputTableName(job, "l_spo"); - AccumuloInputFormat.setScanAuthorizations(job, Constants.NO_AUTHS); - AccumuloInputFormat.setZooKeeperInstance(job, "acu13", "stratus25:2181"); - AccumuloInputFormat.setRanges(job, Collections.singleton(ALLRANGE)); - job.setMapperClass(NullMapper.class); - job.setNumReduceTasks(0); - job.setOutputFormatClass(NullOutputFormat.class); - if (args.length == 0) { - job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); - } else { - job.setInputFormatClass(AccumuloInputFormat.class); - } - job.waitForCompletion(true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @SuppressWarnings("rawtypes") - public static class NullMapper extends Mapper { - @Override - protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { - - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java deleted file mode 100644 index 2b89440..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java +++ /dev/null @@ -1,58 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; - -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -@SuppressWarnings("rawtypes") -public class AccumuloProps extends InputFormatBase { - - @Override - public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Accumulo Props just holds properties"); - } - - public static Instance getInstance(JobContext conf) { - return InputFormatBase.getInstance(conf); - } - - public static AuthenticationToken getPassword(JobContext conf) { - return InputFormatBase.getAuthenticationToken(conf); - } - - public static String getUsername(JobContext conf) { - return InputFormatBase.getPrincipal(conf); - } - - public static String getTablename(JobContext conf) { - return InputFormatBase.getInputTableName(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java deleted file mode 100644 index c3003d3..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java +++ /dev/null @@ -1,119 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.URI; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -/** - * Class MRSailUtils - * Date: May 19, 2011 - * Time: 10:34:06 AM - */ -public class MRUtils { - - public static final String JOB_NAME_PROP = "mapred.job.name"; - - public static final String AC_USERNAME_PROP = "ac.username"; - public static final String AC_PWD_PROP = "ac.pwd"; - public static final String AC_ZK_PROP = "ac.zk"; - public static final String AC_INSTANCE_PROP = "ac.instance"; - public static final String AC_TTL_PROP = "ac.ttl"; - public static final String AC_TABLE_PROP = "ac.table"; - public static final String AC_AUTH_PROP = "ac.auth"; - public static final String AC_CV_PROP = "ac.cv"; - public static final String AC_MOCK_PROP = "ac.mock"; - public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput"; - public static final String HADOOP_IO_SORT_MB = "ac.hdfsinput"; - public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout"; - public static final String FORMAT_PROP = "rdf.format"; - public static final String INPUT_PATH = "input"; - - public static final String NAMED_GRAPH_PROP = "rdf.graph"; - - public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix"; - - // rdf constants - public static final ValueFactory vf = new ValueFactoryImpl(); - public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"); - - - // cloudbase map reduce utils - -// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException { -// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key)); -// if (entry_val != null) { -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val)); -// } -// byte[] startrow = startRowOut.toByteArray(); -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES); -// byte[] stoprow = startRowOut.toByteArray(); -// -// Range range = new Range(new Text(startrow), new Text(stoprow)); -// return range; -// } - - - public static String getACTtl(Configuration conf) { - return conf.get(AC_TTL_PROP); - } - - public static String getACUserName(Configuration conf) { - return conf.get(AC_USERNAME_PROP); - } - - public static String getACPwd(Configuration conf) { - return conf.get(AC_PWD_PROP); - } - - public static String getACZK(Configuration conf) { - return conf.get(AC_ZK_PROP); - } - - public static String getACInstance(Configuration conf) { - return conf.get(AC_INSTANCE_PROP); - } - - public static void setACUserName(Configuration conf, String str) { - conf.set(AC_USERNAME_PROP, str); - } - - public static void setACPwd(Configuration conf, String str) { - conf.set(AC_PWD_PROP, str); - } - - public static void setACZK(Configuration conf, String str) { - conf.set(AC_ZK_PROP, str); - } - - public static void setACInstance(Configuration conf, String str) { - conf.set(AC_INSTANCE_PROP, str); - } - - public static void setACTtl(Configuration conf, String str) { - conf.set(AC_TTL_PROP, str); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java deleted file mode 100644 index 1d0d9c9..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java +++ /dev/null @@ -1,402 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable; -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaRange; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.query.BatchRyaQuery; -import mvm.rya.api.persist.query.RyaQuery; -import mvm.rya.api.persist.query.RyaQueryEngine; -import mvm.rya.api.query.strategy.ByteRange; -import mvm.rya.api.query.strategy.TriplePatternStrategy; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRowRegex; -import mvm.rya.api.utils.CloseableIterableIteration; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.accumulo.core.iterators.user.TimestampFilter; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.calrissian.mango.collect.CloseableIterable; -import org.calrissian.mango.collect.CloseableIterables; -import org.calrissian.mango.collect.FluentCloseableIterable; -import org.openrdf.query.BindingSet; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterators; - -/** - * Date: 7/17/12 - * Time: 9:28 AM - */ -public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfiguration> { - - private AccumuloRdfConfiguration configuration; - private Connector connector; - private RyaTripleContext ryaContext; - private final Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, KeyValueToRyaStatementFunction>(); - - public AccumuloRyaQueryEngine(Connector connector) { - this(connector, new AccumuloRdfConfiguration()); - } - - public AccumuloRyaQueryEngine(Connector connector, AccumuloRdfConfiguration conf) { - this.connector = connector; - this.configuration = conf; - ryaContext = RyaTripleContext.getInstance(conf); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO, ryaContext)); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO, ryaContext)); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP, ryaContext)); - } - - @Override - public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - - RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build(); - CloseableIterable<RyaStatement> results = query(ryaQuery); - - return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); - } - - protected String getData(RyaType ryaType) { - return (ryaType != null) ? (ryaType.getData()) : (null); - } - - @Override - public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - //query configuration - Authorizations authorizations = conf.getAuthorizations(); - Long ttl = conf.getTtl(); - Long maxResults = conf.getLimit(); - Integer maxRanges = conf.getMaxRangesForScanner(); - Integer numThreads = conf.getNumThreads(); - - //TODO: cannot span multiple tables here - try { - Collection<Range> ranges = new HashSet<Range>(); - RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); - TABLE_LAYOUT layout = null; - RyaURI context = null; - TriplePatternStrategy strategy = null; - for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { - RyaStatement stmt = stmtbs.getKey(); - context = stmt.getContext(); //TODO: This will be overwritten - BindingSet bs = stmtbs.getValue(); - strategy = ryaContext.retrieveStrategy(stmt); - if (strategy == null) { - throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); - } - - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); - - //use range to set scanner - //populate scanner based on authorizations, ttl - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - ranges.add(range); - rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs)); - } - //no ranges - if (layout == null) return null; - String regexSubject = conf.getRegexSubject(); - String regexPredicate = conf.getRegexPredicate(); - String regexObject = conf.getRegexObject(); - TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); - - String table = layoutToTable(layout, conf); - boolean useBatchScanner = ranges.size() > maxRanges; - RyaStatementBindingSetKeyValueIterator iterator = null; - if (useBatchScanner) { - ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads); - ((BatchScanner) scanner).setRanges(ranges); - fillScanner(scanner, context, null, ttl, null, tripleRowRegex, conf); - iterator = new RyaStatementBindingSetKeyValueIterator(layout, ryaContext, scanner, rangeMap); - } else { - Scanner scannerBase = null; - Iterator<Map.Entry<Key, Value>>[] iters = new Iterator[ranges.size()]; - int i = 0; - for (Range range : ranges) { - scannerBase = connector.createScanner(table, authorizations); - scannerBase.setRange(range); - fillScanner(scannerBase, context, null, ttl, null, tripleRowRegex, conf); - iters[i] = scannerBase.iterator(); - i++; - } - iterator = new RyaStatementBindingSetKeyValueIterator(layout, Iterators.concat(iters), rangeMap, ryaContext); - } - if (maxResults != null) { - iterator.setMaxResults(maxResults); - } - return iterator; - } catch (Exception e) { - throw new RyaDAOException(e); - } - - } - - @Override - public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(Collection<RyaStatement> stmts, AccumuloRdfConfiguration conf) - throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - - BatchRyaQuery batchRyaQuery = BatchRyaQuery.builder(stmts).load(conf).build(); - CloseableIterable<RyaStatement> results = query(batchRyaQuery); - - return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); - } - - @Override - public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws RyaDAOException { - Preconditions.checkNotNull(ryaQuery); - RyaStatement stmt = ryaQuery.getQuery(); - Preconditions.checkNotNull(stmt); - - //query configuration - String[] auths = ryaQuery.getAuths(); - Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); - Long ttl = ryaQuery.getTtl(); - Long currentTime = ryaQuery.getCurrentTime(); - Long maxResults = ryaQuery.getMaxResults(); - Integer batchSize = ryaQuery.getBatchSize(); - String regexSubject = ryaQuery.getRegexSubject(); - String regexPredicate = ryaQuery.getRegexPredicate(); - String regexObject = ryaQuery.getRegexObject(); - TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); - - try { - //find triple pattern range - TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt); - TABLE_LAYOUT layout; - Range range; - RyaURI subject = stmt.getSubject(); - RyaURI predicate = stmt.getPredicate(); - RyaType object = stmt.getObject(); - RyaURI context = stmt.getContext(); - String qualifier = stmt.getQualifer(); - TripleRowRegex tripleRowRegex = null; - if (strategy != null) { - //otherwise, full table scan is supported - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(subject, predicate, object, context, null); - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - - byte[] objectTypeInfo = null; - if (object != null) { - //TODO: Not good to serialize this twice - if (object instanceof RyaRange) { - objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1]; - } else { - objectTypeInfo = RyaContext.getInstance().serializeType(object)[1]; - } - } - - tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo); - } else { - range = new Range(); - layout = TABLE_LAYOUT.SPO; - } - - //use range to set scanner - //populate scanner based on authorizations, ttl - String table = layoutToTable(layout, tableLayoutStrategy); - Scanner scanner = connector.createScanner(table, authorizations); - scanner.setRange(range); - if (batchSize != null) { - scanner.setBatchSize(batchSize); - } - fillScanner(scanner, context, qualifier, ttl, currentTime, tripleRowRegex, ryaQuery.getConf()); - - FluentCloseableIterable<RyaStatement> results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)) - .transform(keyValueToRyaStatementFunctionMap.get(layout)); - if (maxResults != null) { - results = results.limit(maxResults.intValue()); - } - - return results; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) throws RyaDAOException { - Preconditions.checkNotNull(ryaQuery); - Iterable<RyaStatement> stmts = ryaQuery.getQueries(); - Preconditions.checkNotNull(stmts); - - //query configuration - String[] auths = ryaQuery.getAuths(); - final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); - final Long ttl = ryaQuery.getTtl(); - Long currentTime = ryaQuery.getCurrentTime(); - Long maxResults = ryaQuery.getMaxResults(); - Integer batchSize = ryaQuery.getBatchSize(); - Integer numQueryThreads = ryaQuery.getNumQueryThreads(); - String regexSubject = ryaQuery.getRegexSubject(); - String regexPredicate = ryaQuery.getRegexPredicate(); - String regexObject = ryaQuery.getRegexObject(); - TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); - int maxRanges = ryaQuery.getMaxRanges(); - - //TODO: cannot span multiple tables here - try { - Collection<Range> ranges = new HashSet<Range>(); - TABLE_LAYOUT layout = null; - RyaURI context = null; - TriplePatternStrategy strategy = null; - for (RyaStatement stmt : stmts) { - context = stmt.getContext(); //TODO: This will be overwritten - strategy = ryaContext.retrieveStrategy(stmt); - if (strategy == null) { - throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); - } - - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); - - //use range to set scanner - //populate scanner based on authorizations, ttl - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - ranges.add(range); - } - //no ranges - if (layout == null) throw new IllegalArgumentException("No table layout specified"); - - final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); - - final String table = layoutToTable(layout, tableLayoutStrategy); - boolean useBatchScanner = ranges.size() > maxRanges; - FluentCloseableIterable<RyaStatement> results = null; - if (useBatchScanner) { - BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads); - scanner.setRanges(ranges); - fillScanner(scanner, context, null, ttl, null, tripleRowRegex, ryaQuery.getConf()); - results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout)); - } else { - final RyaURI fcontext = context; - final RdfCloudTripleStoreConfiguration fconf = ryaQuery.getConf(); - FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { - @Override - public Iterable<Map.Entry<Key, Value>> apply(Range range) { - try { - Scanner scanner = connector.createScanner(table, authorizations); - scanner.setRange(range); - fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf); - return scanner; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }).transform(keyValueToRyaStatementFunctionMap.get(layout)); - - results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent)); - } - if (maxResults != null) { - results = results.limit(maxResults.intValue()); - } - return results; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException { - if (context != null && qualifier != null) { - scanner.fetchColumn(new Text(context.getData()), new Text(qualifier)); - } else if (context != null) { - scanner.fetchColumnFamily(new Text(context.getData())); - } else if (qualifier != null) { - IteratorSetting setting = new IteratorSetting(8, "riq", RegExFilter.class.getName()); - RegExFilter.setRegexs(setting, null, null, qualifier, null, false); - scanner.addScanIterator(setting); - } - if (ttl != null) { - IteratorSetting setting = new IteratorSetting(9, "fi", TimestampFilter.class.getName()); - TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true); - if(currentTime != null){ - TimestampFilter.setStart(setting, currentTime - ttl, true); - TimestampFilter.setEnd(setting, currentTime, true); - } - scanner.addScanIterator(setting); - } - if (tripleRowRegex != null) { - IteratorSetting setting = new IteratorSetting(11, "ri", RegExFilter.class.getName()); - String regex = tripleRowRegex.getRow(); - RegExFilter.setRegexs(setting, regex, null, null, null, false); - scanner.addScanIterator(setting); - } - } - - @Override - public void setConf(AccumuloRdfConfiguration conf) { - this.configuration = conf; - } - - @Override - public AccumuloRdfConfiguration getConf() { - return configuration; - } -}
