http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java deleted file mode 100644 index f44b6aa..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java +++ /dev/null @@ -1,175 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; - -import java.io.IOException; -import java.util.Collection; -import java.util.Date; -import java.util.Map; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.rio.RDFFormat; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.RyaTableMutationsFactory; -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.RyaStatementWritable; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; - -/** - * Do bulk import of rdf files - * Class RdfFileInputTool - * Date: May 16, 2011 - * Time: 3:12:16 PM - */ -public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool { - - private String format = RDFFormat.RDFXML.getName(); - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new RdfFileInputTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException, AccumuloSecurityException { - conf.set(MRUtils.JOB_NAME_PROP, "Rdf File Input"); - //faster - init(); - format = conf.get(MRUtils.FORMAT_PROP, format); - conf.set(MRUtils.FORMAT_PROP, format); - - String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]); - - Job job = new Job(conf); - job.setJarByClass(RdfFileInputTool.class); - - // set up cloudbase input - job.setInputFormatClass(RdfFileInputFormat.class); - RdfFileInputFormat.addInputPath(job, new Path(inputPath)); - - // set input output of the particular job - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(RyaStatementWritable.class); - - setupOutputFormat(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - - // set mapper and reducer classes - job.setMapperClass(StatementToMutationMapper.class); - job.setNumReduceTasks(0); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return job - .getCounters() - .findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS").getValue(); - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public int run(String[] args) throws Exception { - runJob(args); - return 0; - } - - public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> { - protected String tablePrefix; - protected Text spo_table; - protected Text po_table; - protected Text osp_table; - private byte[] cv = EMPTY_CV.getExpression(); - RyaTableMutationsFactory mut; - - public StatementToMutationMapper() { - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - mut = new RyaTableMutationsFactory(RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf))); - tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - - final String cv_s = conf.get(MRUtils.AC_CV_PROP); - if (cv_s != null) - cv = cv_s.getBytes(); - } - - @Override - protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { - RyaStatement statement = value.getRyaStatement(); - if (statement.getColumnVisibility() == null) { - statement.setColumnVisibility(cv); - } - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap = - mut.serialize(statement); - Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - - for (Mutation m : spo) { - context.write(spo_table, m); - } - for (Mutation m : po) { - context.write(po_table, m); - } - for (Mutation m : osp) { - context.write(osp_table, m); - } - } - - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java deleted file mode 100644 index 89f0aa5..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java +++ /dev/null @@ -1,240 +0,0 @@ -package mvm.rya.accumulo.mr.upgrade; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.utils.MRUtils; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.calrissian.mango.types.LexiTypeEncoders; -import org.calrissian.mango.types.TypeEncoder; - -import java.io.IOException; -import java.util.Date; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.*; - -/** - */ -public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool { - @Override - public int run(String[] strings) throws Exception { - conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2"); - //faster - init(); - - Job job = new Job(conf); - job.setJarByClass(Upgrade322Tool.class); - - setupInputFormat(job); - AccumuloInputFormat.setInputTableName(job, tablePrefix + TBL_OSP_SUFFIX); - - //we do not need to change any row that is a string, custom, or uri type - IteratorSetting regex = new IteratorSetting(30, "regex", - RegExFilter.class); - RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false); - RegExFilter.setNegate(regex, true); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - setupOutputFormat(job, tablePrefix + - TBL_SPO_SUFFIX); - - // set mapper and reducer classes - job.setMapperClass(Upgrade322Mapper.class); - job.setReducerClass(Reducer.class); - - // Submit the job - return job.waitForCompletion(true) ? 0 : 1; - } - - public static void main(String[] args) { - try { - ToolRunner.run(new Configuration(), new Upgrade322Tool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * Reading from the OSP table - */ - public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> { - - private String tablePrefix; - private Text spoTable; - private Text poTable; - private Text ospTable; - - private final UpgradeObjectSerialization upgradeObjectSerialization; - - public Upgrade322Mapper() { - this(new UpgradeObjectSerialization()); - } - - public Upgrade322Mapper( - UpgradeObjectSerialization upgradeObjectSerialization) { - this.upgradeObjectSerialization = upgradeObjectSerialization; - } - - @Override - protected void setup( - Context context) throws IOException, InterruptedException { - super.setup(context); - - tablePrefix = context.getConfiguration().get( - MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF); - spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX); - poTable = new Text(tablePrefix + TBL_PO_SUFFIX); - ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX); - } - - @Override - protected void map( - Key key, Value value, Context context) - throws IOException, InterruptedException { - - //read the key, expect OSP - final String row = key.getRow().toString(); - final int firstDelim = row.indexOf(DELIM); - final int secondDelim = row.indexOf(DELIM, firstDelim + 1); - final int typeDelim = row.lastIndexOf(TYPE_DELIM); - final String oldSerialization = row.substring(0, firstDelim); - char typeMarker = row.charAt(row.length() - 1); - - final String subject = row.substring(firstDelim + 1, secondDelim); - final String predicate = row.substring(secondDelim + 1, typeDelim); - final String typeSuffix = TYPE_DELIM + typeMarker; - - String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker); - if(newSerialization == null) { - return; - } - - //write out delete Mutations - Mutation deleteOldSerialization_osp = new Mutation(key.getRow()); - deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix); - deleteOldSerialization_po.putDelete(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix); - deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(), - key.getColumnVisibilityParsed()); - - //write out new serialization - Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix); - putNewSerialization_osp.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix); - putNewSerialization_po.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix); - putNewSerialization_spo.put(key.getColumnFamily(), - key.getColumnQualifier(), - key.getColumnVisibilityParsed(), - key.getTimestamp(), value); - - //write out deletes to all tables - context.write(ospTable, deleteOldSerialization_osp); - context.write(poTable, deleteOldSerialization_po); - context.write(spoTable, deleteOldSerialization_spo); - - //write out inserts to all tables - context.write(ospTable, putNewSerialization_osp); - context.write(poTable, putNewSerialization_po); - context.write(spoTable, putNewSerialization_spo); - } - } - - public static class UpgradeObjectSerialization { - - public static final TypeEncoder<Boolean, String> - BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder(); - public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER - = LexiTypeEncoders.byteEncoder(); - public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER - = LexiTypeEncoders.dateEncoder(); - public static final TypeEncoder<Integer, String> - INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder(); - public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER - = LexiTypeEncoders.longEncoder(); - public static final TypeEncoder<Double, String> - DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder(); - - public String upgrade(String object, int typeMarker) { - switch(typeMarker) { - case 10: //boolean - final boolean bool = Boolean.parseBoolean(object); - return BOOLEAN_STRING_TYPE_ENCODER.encode(bool); - case 9: //byte - final byte b = Byte.parseByte(object); - return BYTE_STRING_TYPE_ENCODER.encode(b); - case 4: //long - final Long lng = Long.parseLong(object); - return LONG_STRING_TYPE_ENCODER.encode(lng); - case 5: //int - final Integer i = Integer.parseInt(object); - return INTEGER_STRING_TYPE_ENCODER.encode(i); - case 6: //double - String exp = object.substring(2, 5); - char valueSign = object.charAt(0); - char expSign = object.charAt(1); - Integer expInt = Integer.parseInt(exp); - if (expSign == '-') { - expInt = 999 - expInt; - } - final String expDoubleStr = - String.format("%s%sE%s%d", valueSign, - object.substring(6), - expSign, expInt); - return DOUBLE_STRING_TYPE_ENCODER - .encode(Double.parseDouble(expDoubleStr)); - case 7: //datetime - //check to see if it is an early release that includes the exact term xsd:dateTime - final Long l = Long.MAX_VALUE - Long.parseLong(object); - Date date = new Date(l); - return DATE_STRING_TYPE_ENCODER.encode(date); - default: - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java deleted file mode 100644 index c9dac6b..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java +++ /dev/null @@ -1,206 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.rfile.RFileOperations; -import org.apache.accumulo.core.util.ArgumentChecker; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; - -/** - * Finds the accumulo tablet files on the hdfs disk, and uses that as the input for MR jobs - * Date: 5/11/12 - * Time: 2:04 PM - */ -public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> { - - public static final Range ALLRANGE = new Range(new Text("\u0000"), new Text("\uFFFD")); - - @Override - public List<InputSplit> getSplits(JobContext jobContext) throws IOException { - //read the params from AccumuloInputFormat - Configuration conf = jobContext.getConfiguration(); - Instance instance = AccumuloProps.getInstance(jobContext); - String user = AccumuloProps.getUsername(jobContext); - AuthenticationToken password = AccumuloProps.getPassword(jobContext); - String table = AccumuloProps.getTablename(jobContext); - ArgumentChecker.notNull(instance); - ArgumentChecker.notNull(table); - - //find the files necessary - try { - AccumuloConfiguration acconf = instance.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - Connector connector = instance.getConnector(user, password); - TableOperations tos = connector.tableOperations(); - String tableId = tos.tableIdMap().get(table); - String filePrefix = acconf.get(Property.INSTANCE_DFS_DIR) + "/tables/" + tableId; - System.out.println(filePrefix); - - Scanner scanner = connector.createScanner("!METADATA", Constants.NO_AUTHS); //TODO: auths? - scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD"))); - scanner.fetchColumnFamily(new Text("file")); - List<String> files = new ArrayList<String>(); - List<InputSplit> fileSplits = new ArrayList<InputSplit>(); - Job job = new Job(conf); - for (Map.Entry<Key, Value> entry : scanner) { - String file = filePrefix + entry.getKey().getColumnQualifier().toString(); - files.add(file); - Path path = new Path(file); - FileStatus fileStatus = fs.getFileStatus(path); - long len = fileStatus.getLen(); - BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len); - fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts())); -// FileInputFormat.addInputPath(job, path); - } - System.out.println(files); - return fileSplits; -// return super.getSplits(job); - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new RecordReader<Key, Value>() { - - private FileSKVIterator fileSKVIterator; - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - FileSplit split = (FileSplit) inputSplit; - Configuration job = taskAttemptContext.getConfiguration(); - Path file = split.getPath(); -// long start = split.getStart(); -// long length = split.getLength(); - FileSystem fs = file.getFileSystem(job); -// FSDataInputStream fileIn = fs.open(file); -// System.out.println(start); -// if (start != 0L) { -// fileIn.seek(start); -// } - Instance instance = AccumuloProps.getInstance(taskAttemptContext); - - fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE, - new HashSet<ByteSequence>(), false, fs, job, instance.getConfiguration()); -// fileSKVIterator = new RFileOperations2().openReader(fileIn, length - start, job); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - fileSKVIterator.next(); - return fileSKVIterator.hasTop(); - } - - @Override - public Key getCurrentKey() throws IOException, InterruptedException { - return fileSKVIterator.getTopKey(); - } - - @Override - public Value getCurrentValue() throws IOException, InterruptedException { - return fileSKVIterator.getTopValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - - @Override - public void close() throws IOException { - //To change body of implemented methods use File | Settings | File Templates. - } - }; - } - - public static void main(String[] args) { - try { - Job job = new Job(new Configuration()); - job.setJarByClass(AccumuloHDFSFileInputFormat.class); - Configuration conf = job.getConfiguration(); - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken("secret")); - AccumuloInputFormat.setInputTableName(job, "l_spo"); - AccumuloInputFormat.setScanAuthorizations(job, Constants.NO_AUTHS); - AccumuloInputFormat.setZooKeeperInstance(job, "acu13", "stratus25:2181"); - AccumuloInputFormat.setRanges(job, Collections.singleton(ALLRANGE)); - job.setMapperClass(NullMapper.class); - job.setNumReduceTasks(0); - job.setOutputFormatClass(NullOutputFormat.class); - if (args.length == 0) { - job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); - } else { - job.setInputFormatClass(AccumuloInputFormat.class); - } - job.waitForCompletion(true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @SuppressWarnings("rawtypes") - public static class NullMapper extends Mapper { - @Override - protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { - - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java deleted file mode 100644 index 2b89440..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java +++ /dev/null @@ -1,58 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; - -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -@SuppressWarnings("rawtypes") -public class AccumuloProps extends InputFormatBase { - - @Override - public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - throw new UnsupportedOperationException("Accumulo Props just holds properties"); - } - - public static Instance getInstance(JobContext conf) { - return InputFormatBase.getInstance(conf); - } - - public static AuthenticationToken getPassword(JobContext conf) { - return InputFormatBase.getAuthenticationToken(conf); - } - - public static String getUsername(JobContext conf) { - return InputFormatBase.getPrincipal(conf); - } - - public static String getTablename(JobContext conf) { - return InputFormatBase.getInputTableName(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java deleted file mode 100644 index c3003d3..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java +++ /dev/null @@ -1,119 +0,0 @@ -package mvm.rya.accumulo.mr.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.URI; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -/** - * Class MRSailUtils - * Date: May 19, 2011 - * Time: 10:34:06 AM - */ -public class MRUtils { - - public static final String JOB_NAME_PROP = "mapred.job.name"; - - public static final String AC_USERNAME_PROP = "ac.username"; - public static final String AC_PWD_PROP = "ac.pwd"; - public static final String AC_ZK_PROP = "ac.zk"; - public static final String AC_INSTANCE_PROP = "ac.instance"; - public static final String AC_TTL_PROP = "ac.ttl"; - public static final String AC_TABLE_PROP = "ac.table"; - public static final String AC_AUTH_PROP = "ac.auth"; - public static final String AC_CV_PROP = "ac.cv"; - public static final String AC_MOCK_PROP = "ac.mock"; - public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput"; - public static final String HADOOP_IO_SORT_MB = "ac.hdfsinput"; - public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout"; - public static final String FORMAT_PROP = "rdf.format"; - public static final String INPUT_PATH = "input"; - - public static final String NAMED_GRAPH_PROP = "rdf.graph"; - - public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix"; - - // rdf constants - public static final ValueFactory vf = new ValueFactoryImpl(); - public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"); - - - // cloudbase map reduce utils - -// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException { -// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key)); -// if (entry_val != null) { -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES); -// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val)); -// } -// byte[] startrow = startRowOut.toByteArray(); -// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES); -// byte[] stoprow = startRowOut.toByteArray(); -// -// Range range = new Range(new Text(startrow), new Text(stoprow)); -// return range; -// } - - - public static String getACTtl(Configuration conf) { - return conf.get(AC_TTL_PROP); - } - - public static String getACUserName(Configuration conf) { - return conf.get(AC_USERNAME_PROP); - } - - public static String getACPwd(Configuration conf) { - return conf.get(AC_PWD_PROP); - } - - public static String getACZK(Configuration conf) { - return conf.get(AC_ZK_PROP); - } - - public static String getACInstance(Configuration conf) { - return conf.get(AC_INSTANCE_PROP); - } - - public static void setACUserName(Configuration conf, String str) { - conf.set(AC_USERNAME_PROP, str); - } - - public static void setACPwd(Configuration conf, String str) { - conf.set(AC_PWD_PROP, str); - } - - public static void setACZK(Configuration conf, String str) { - conf.set(AC_ZK_PROP, str); - } - - public static void setACInstance(Configuration conf, String str) { - conf.set(AC_INSTANCE_PROP, str); - } - - public static void setACTtl(Configuration conf, String str) { - conf.set(AC_TTL_PROP, str); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java deleted file mode 100644 index 1e74e7c..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java +++ /dev/null @@ -1,225 +0,0 @@ -package mvm.rya.accumulo.mr; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.RyaTableMutationsFactory; -import mvm.rya.accumulo.mr.RyaStatementInputFormat.RyaStatementRecordReader; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaTripleContext; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mrunit.mapreduce.MapDriver; -import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class RyaInputFormatTest { - - static String username = "root", table = "rya_spo"; - static PasswordToken password = new PasswordToken(""); - - static Instance instance; - static AccumuloRyaDAO apiImpl; - - @BeforeClass - public static void init() throws Exception { - instance = new MockInstance("mock_instance"); - Connector connector = instance.getConnector(username, password); - connector.tableOperations().create(table); - - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("rya_"); - conf.setDisplayQueryPlan(false); - - apiImpl = new AccumuloRyaDAO(); - apiImpl.setConf(conf); - apiImpl.setConnector(connector); - } - - @Before - public void before() throws Exception { - apiImpl.init(); - } - - @After - public void after() throws Exception { - apiImpl.dropAndDestroy(); - } - - @Test - public void testInputFormat() throws Exception { - - - RyaStatement input = RyaStatement.builder() - .setSubject(new RyaURI("http://www.google.com")) - .setPredicate(new RyaURI("http://some_other_uri")) - .setObject(new RyaURI("http://www.yahoo.com")) - .setColumnVisibility(new byte[0]) - .setValue(new byte[0]) - .build(); - - apiImpl.add(input); - - Job jobConf = Job.getInstance(); - - RyaStatementInputFormat.setMockInstance(jobConf, instance.getInstanceName()); - RyaStatementInputFormat.setConnectorInfo(jobConf, username, password); - RyaStatementInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); - - AccumuloInputFormat.setInputTableName(jobConf, table); - AccumuloInputFormat.setInputTableName(jobConf, table); - AccumuloInputFormat.setScanIsolation(jobConf, false); - AccumuloInputFormat.setLocalIterators(jobConf, false); - AccumuloInputFormat.setOfflineTableScan(jobConf, false); - - RyaStatementInputFormat inputFormat = new RyaStatementInputFormat(); - - JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); - - List<InputSplit> splits = inputFormat.getSplits(context); - - Assert.assertEquals(1, splits.size()); - - TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); - - RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); - - RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; - ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); - - List<RyaStatement> results = new ArrayList<RyaStatement>(); - while(ryaStatementRecordReader.nextKeyValue()) { - RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue(); - RyaStatement value = writable.getRyaStatement(); - Text text = ryaStatementRecordReader.getCurrentKey(); - RyaStatement stmt = RyaStatement.builder() - .setSubject(value.getSubject()) - .setPredicate(value.getPredicate()) - .setObject(value.getObject()) - .setContext(value.getContext()) - .setQualifier(value.getQualifer()) - .setColumnVisibility(value.getColumnVisibility()) - .setValue(value.getValue()) - .build(); - results.add(stmt); - - System.out.println(text); - System.out.println(value); - } - - Assert.assertTrue(results.size() == 2); - Assert.assertTrue(results.contains(input)); - } - - @Test - public void mapperTest() throws Exception { - - RyaStatement input = RyaStatement.builder() - .setSubject(new RyaURI("http://www.google.com")) - .setPredicate(new RyaURI("http://some_other_uri")) - .setObject(new RyaURI("http://www.yahoo.com")) - .setValue(new byte[0]) - .setTimestamp(0L) - .build(); - - RyaStatementWritable writable = new RyaStatementWritable(); - writable.setRyaStatement(input); - - RyaStatementMapper mapper = new RyaStatementMapper(); - MapDriver<Text, RyaStatementWritable, Text, Mutation> mapDriver = MapDriver.newMapDriver(mapper); - - RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); - RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context); - - Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(input); - - mapDriver.withInput(new Text("sometext"), writable); - - for(TABLE_LAYOUT key : mutations.keySet()) { - Collection<Mutation> mutationCollection = mutations.get(key); - for(Mutation m : mutationCollection) { - mapDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m); - } - } - - mapDriver.runTest(false); - - } - - @Test - public void reducerTest() throws Exception { - RyaStatement input = RyaStatement.builder() - .setSubject(new RyaURI("http://www.google.com")) - .setPredicate(new RyaURI("http://some_other_uri")) - .setObject(new RyaURI("http://www.yahoo.com")) - .setValue(new byte[0]) - .setTimestamp(0L) - .build(); - - RyaStatementWritable writable = new RyaStatementWritable(); - writable.setRyaStatement(input); - - RyaStatementReducer reducer = new RyaStatementReducer(); - ReduceDriver<WritableComparable, RyaStatementWritable, Text, Mutation> reduceDriver = ReduceDriver.newReduceDriver(reducer); - - RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); - RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context); - - Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(input); - - reduceDriver.withInput(new Text("sometext"), Arrays.asList(writable)); - - for(TABLE_LAYOUT key : mutations.keySet()) { - Collection<Mutation> mutationCollection = mutations.get(key); - for(Mutation m : mutationCollection) { - reduceDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m); - } - } - - reduceDriver.runTest(false); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java deleted file mode 100644 index bda73e2..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java +++ /dev/null @@ -1,282 +0,0 @@ -package mvm.rya.accumulo.mr.eval; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RdfToRyaConversions; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.hadoop.io.Text; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Created by IntelliJ IDEA. - * Date: 4/24/12 - * Time: 5:05 PM - * To change this template use File | Settings | File Templates. - */ -@Ignore -public class AccumuloRdfCountToolTest { - - private String user = "user"; - private String pwd = "pwd"; - private String instance = "myinstance"; - private String tablePrefix = "t_"; - private Authorizations auths = Constants.NO_AUTHS; - private Connector connector; - - private AccumuloRyaDAO dao; - private ValueFactory vf = new ValueFactoryImpl(); - private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - static String litdupsNS = "urn:test:litdups#"; - - @Before - public void setUp() throws Exception { - connector = new MockInstance(instance).getConnector(user, pwd.getBytes()); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - SecurityOperations secOps = connector.securityOperations(); - secOps.createUser(user, pwd.getBytes(), auths); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE); - - dao = new AccumuloRyaDAO(); - dao.setConnector(connector); - conf.setTablePrefix(tablePrefix); - dao.setConf(conf); - dao.init(); - } - - @After - public void tearDown() throws Exception { - dao.destroy(); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - } - - @Test - public void testMR() throws Exception { - RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1")); - RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1")); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10)))); - - AccumuloRdfCountTool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Drdf.tablePrefix=" + tablePrefix, - }); - - Map<String, Key> expectedValues = new HashMap<String, Key>(); - String row = test1.getData(); - expectedValues.put(row, - new Key(new Text(row), - RdfCloudTripleStoreConstants.SUBJECT_CF_TXT, - RdfCloudTripleStoreConstants.EMPTY_TEXT)); - row = pred1.getData(); - expectedValues.put(row, - new Key(new Text(row), - RdfCloudTripleStoreConstants.PRED_CF_TXT, - RdfCloudTripleStoreConstants.EMPTY_TEXT)); - Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths); - scanner.setRange(new Range()); - int count = 0; - for (Map.Entry<Key, Value> entry : scanner) { - assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); - assertEquals(11, Long.parseLong(entry.getValue().toString())); - count++; - } - assertEquals(2, count); - } - -// public void testMRObject() throws Exception { -// URI pred1 = vf.createURI(litdupsNS, "pred1"); -// Literal literal = vf.createLiteral(0); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test0"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test1"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test2"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test3"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test4"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test5"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test6"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test7"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test8"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test9"), pred1, literal)); -// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test10"), pred1, literal)); -// dao.commit(); -// -// AccumuloRdfCountTool.main(new String[]{ -// "-Dac.mock=true", -// "-Dac.instance=" + instance, -// "-Dac.username=" + user, -// "-Dac.pwd=" + pwd, -// "-Drdf.tablePrefix=" + tablePrefix, -// }); -// -// Map<String, Key> expectedValues = new HashMap<String, Key>(); -// byte[] row_bytes = RdfCloudTripleStoreUtils.writeValue(literal); -// expectedValues.put(new String(row_bytes), -// new Key(new Text(row_bytes), -// RdfCloudTripleStoreConstants.OBJ_CF_TXT, -// RdfCloudTripleStoreConstants.INFO_TXT)); -// row_bytes = RdfCloudTripleStoreUtils.writeValue(pred1); -// expectedValues.put(new String(row_bytes), -// new Key(new Text(row_bytes), -// RdfCloudTripleStoreConstants.PRED_CF_TXT, -// RdfCloudTripleStoreConstants.INFO_TXT)); -// Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths); -// scanner.setRange(new Range()); -// int count = 0; -// for (Map.Entry<Key, Value> entry : scanner) { -// assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); -// assertEquals(11, Long.parseLong(entry.getValue().toString())); -// count++; -// } -// assertEquals(2, count); -// } - - @Test - public void testTTL() throws Exception { - RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1")); - RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1")); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9)))); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10)))); - - AccumuloRdfCountTool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Dac.ttl=0", - "-Drdf.tablePrefix=" + tablePrefix, - }); - - Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths); - scanner.setRange(new Range()); - int count = 0; - for (Map.Entry<Key, Value> entry : scanner) { - count++; - } - assertEquals(0, count); - } - - @Test - public void testContext() throws Exception { - RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1")); - RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1")); - RyaURI cntxt = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cntxt")); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9)), cntxt)); - dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10)), cntxt)); - - AccumuloRdfCountTool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Drdf.tablePrefix=" + tablePrefix, - }); - - Map<String, Key> expectedValues = new HashMap<String, Key>(); - String row = test1.getData(); - expectedValues.put(row, - new Key(new Text(row), - RdfCloudTripleStoreConstants.SUBJECT_CF_TXT, - new Text(cntxt.getData()))); - row = pred1.getData(); - expectedValues.put(row, - new Key(new Text(row), - RdfCloudTripleStoreConstants.PRED_CF_TXT, - new Text(cntxt.getData()))); - Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths); - scanner.setRange(new Range()); - int count = 0; - for (Map.Entry<Key, Value> entry : scanner) { - assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); - assertEquals(11, Long.parseLong(entry.getValue().toString())); - count++; - } - assertEquals(2, count); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java deleted file mode 100644 index 02b8357..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java +++ /dev/null @@ -1,146 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.util.Iterator; -import java.util.Map; - -import junit.framework.TestCase; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.hadoop.io.Text; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.rio.RDFFormat; - -/** - * Created by IntelliJ IDEA. - * Date: 4/25/12 - * Time: 10:51 AM - * To change this template use File | Settings | File Templates. - */ -public class RdfFileInputToolTest extends TestCase { - - private String user = "user"; - private String pwd = "pwd"; - private String instance = "myinstance"; - private String tablePrefix = "t_"; - private Authorizations auths = Constants.NO_AUTHS; - private Connector connector; - - @Override - public void setUp() throws Exception { - super.setUp(); - connector = new MockInstance(instance).getConnector(user, pwd.getBytes()); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - SecurityOperations secOps = connector.securityOperations(); - secOps.createUser(user, pwd.getBytes(), auths); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - } - - public void testNTriplesInput() throws Exception { - RdfFileInputTool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Drdf.tablePrefix=" + tablePrefix, - "-Drdf.format=" + RDFFormat.NTRIPLES.getName(), - "src/test/resources/test.ntriples", - }); - - Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, auths); - scanner.setRange(new Range()); - Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator(); - ValueFactory vf = new ValueFactoryImpl(); - assertTrue(iterator.hasNext()); - RyaStatement rs = new RyaStatement(new RyaURI("urn:lubm:rdfts#GraduateStudent01"), - new RyaURI("urn:lubm:rdfts#hasFriend"), - new RyaURI("urn:lubm:rdfts#GraduateStudent02")); - assertEquals(new Text(RyaTripleContext.getInstance(new AccumuloRdfConfiguration()).serializeTriple(rs).get(TABLE_LAYOUT.SPO).getRow()), iterator.next().getKey().getRow()); - } - - public void testInputContext() throws Exception { - RdfFileInputTool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Drdf.tablePrefix=" + tablePrefix, - "-Drdf.format=" + RDFFormat.TRIG.getName(), - "src/test/resources/namedgraphs.trig", - }); - - Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, auths); - scanner.setRange(new Range()); - Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator(); - ValueFactory vf = new ValueFactoryImpl(); - assertTrue(iterator.hasNext()); - RyaStatement rs = new RyaStatement(new RyaURI("http://www.example.org/exampleDocument#Monica"), - new RyaURI("http://www.example.org/vocabulary#name"), - new RyaType("Monica Murphy"), - new RyaURI("http://www.example.org/exampleDocument#G1")); - Key key = iterator.next().getKey(); - - TripleRow tripleRow = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()).serializeTriple(rs).get(TABLE_LAYOUT.SPO); - assertEquals(new Text(tripleRow.getRow()), key.getRow()); - assertEquals(new Text(tripleRow.getColumnFamily()), key.getColumnFamily()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java deleted file mode 100644 index 5ac2d74..0000000 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java +++ /dev/null @@ -1,319 +0,0 @@ -package mvm.rya.accumulo.mr.upgrade; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import junit.framework.TestCase; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.query.RyaQuery; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.*; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.calrissian.mango.collect.CloseableIterable; -import org.openrdf.model.vocabulary.XMLSchema; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -/** - * Created by IntelliJ IDEA. - * Date: 4/25/12 - * Time: 10:51 AM - * To change this template use File | Settings | File Templates. - */ -public class Upgrade322ToolTest extends TestCase { - - private String user = "user"; - private String pwd = "pwd"; - private String instance = "myinstance"; - private String tablePrefix = "t_"; - private Authorizations auths = Constants.NO_AUTHS; - private Connector connector; - - @Override - public void setUp() throws Exception { - super.setUp(); - - final String spoTable = tablePrefix + - RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; - final String poTable = tablePrefix + - RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; - final String ospTable = tablePrefix + - RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; - - connector = new MockInstance(instance).getConnector(user, pwd.getBytes()); - - connector.tableOperations().create(spoTable); - connector.tableOperations().create(poTable); - connector.tableOperations().create(ospTable); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - SecurityOperations secOps = connector.securityOperations(); - secOps.createUser(user, pwd.getBytes(), auths); - secOps.grantTablePermission(user, spoTable, TablePermission.READ); - secOps.grantTablePermission(user, poTable, TablePermission.READ); - secOps.grantTablePermission(user, ospTable, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ); - secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE); - - //load data - final BatchWriter ospWriter = connector - .createBatchWriter(ospTable, new BatchWriterConfig()); - ospWriter.addMutation(getMutation("00000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u0001\u0004")); - ospWriter.addMutation(getMutation("00000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u0001\u0005")); - ospWriter.addMutation(getMutation("00000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u0001\t")); - ospWriter.addMutation(getMutation("00001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u0001\u0006")); - ospWriter.addMutation(getMutation("10\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" + - "://here/2010/tracked-data-provenance/ns#shortLit\u0001http://www.w3" + - ".org/2001/XMLSchema#short\u0001\b")); - ospWriter.addMutation(getMutation("10.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" + - "://www.w3.org/2001/XMLSchema#float\u0001\b")); - ospWriter.addMutation(getMutation("3.0.0\u0000urn:mvm.rya/2012/05#rts\u0000urn:mvm" + - ".rya/2012/05#version\u0001\u0003")); - ospWriter.addMutation(getMutation("9223370726404375807\u0000http://here/2010/tracked-data-provenance/ns" + - "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" + - "\u0001\u0007")); - ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#Created\u0000http://here" + - "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" + - ".org/1999/02/22-rdf-syntax-ns#type\u0001\u0002")); - ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http" + - "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" + - "/tracked-data-provenance/ns#uriLit\u0001\u0002")); - ospWriter.addMutation(getMutation("stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0001" + - "\u0003")); - ospWriter.addMutation(getMutation("true\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0001\n")); - ospWriter.flush(); - ospWriter.close(); - - final BatchWriter spoWriter = connector - .createBatchWriter(spoTable, new BatchWriterConfig()); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0001\u0004")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0001\u0005")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0001\t")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0001\u0006")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" + - "://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0001http://www.w3" + - ".org/2001/XMLSchema#short\u0001\b")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" + - "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b")); - spoWriter.addMutation(getMutation("urn:mvm.rya/2012/05#rts\u0000urn:mvm" + - ".rya/2012/05#version\u00003.0.0\u0001\u0003")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns" + - "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" + - "\u00009223370726404375807\u0001\u0007")); - spoWriter.addMutation(getMutation("http://here" + - "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" + - ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0001\u0002")); - spoWriter.addMutation(getMutation("http" + - "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" + - "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0001\u0002")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0001" + - "\u0003")); - spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + - "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0001\n")); - spoWriter.flush(); - spoWriter.close(); - - final BatchWriter poWriter = connector - .createBatchWriter(poTable, new BatchWriterConfig()); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0004")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0005")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\t")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0006")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http://www.w3" + - ".org/2001/XMLSchema#short\u0001\b")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#floatLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http" + - "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b")); - poWriter.addMutation(getMutation("urn:mvm" + - ".rya/2012/05#version\u00003.0.0\u0000urn:mvm.rya/2012/05#rts\u0001\u0003")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#dateLit" + - "\u00009223370726404375807\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0007")); - poWriter.addMutation(getMutation("http://www.w3" + - ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002")); - poWriter.addMutation(getMutation("http://here/2010" + - "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001" + - "\u0003")); - poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\n")); - poWriter.flush(); - poWriter.close(); - } - - public Mutation getMutation(String row) { - final Mutation mutation = new Mutation(row); - mutation.put("", "", ""); - return mutation; - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - connector.tableOperations().delete( - tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); - connector.tableOperations().delete( - tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - } - - public void testUpgrade() throws Exception { - Upgrade322Tool.main(new String[]{ - "-Dac.mock=true", - "-Dac.instance=" + instance, - "-Dac.username=" + user, - "-Dac.pwd=" + pwd, - "-Drdf.tablePrefix=" + tablePrefix, - }); - - final AccumuloRdfConfiguration configuration = new AccumuloRdfConfiguration(); - configuration.setTablePrefix(tablePrefix); - final AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); - ryaDAO.setConnector(connector); - ryaDAO.setConf(configuration); - ryaDAO.init(); - - final AccumuloRyaQueryEngine queryEngine = ryaDAO.getQueryEngine(); - - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#booleanLit"), - new RyaType(XMLSchema.BOOLEAN, "true")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#longLit"), - new RyaType(XMLSchema.LONG, "10")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#intLit"), - new RyaType(XMLSchema.INTEGER, "10")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#byteLit"), - new RyaType(XMLSchema.BYTE, "10")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#doubleLit"), - new RyaType(XMLSchema.DOUBLE, "10.0")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#dateLit"), - new RyaType(XMLSchema.DATETIME, "2011-07-12T06:00:00.000Z")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#stringLit"), - new RyaType("stringLit")), queryEngine); - verify(new RyaStatement( - new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), - new RyaURI("http://here/2010/tracked-data-provenance/ns#uriLit"), - new RyaURI("http://here/2010/tracked-data-provenance/ns" + - "#objectuuid1")), queryEngine); - verify(new RyaStatement( - new RyaURI("urn:mvm.rya/2012/05#rts"), - new RyaURI("urn:mvm.rya/2012/05#version"), - new RyaType("3.0.0")), queryEngine); - } - - private void verify(RyaStatement ryaStatement, AccumuloRyaQueryEngine queryEngine) - throws RyaDAOException, IOException { - - //check osp - CloseableIterable<RyaStatement> statements = - queryEngine.query(RyaQuery.builder(new RyaStatement(null, null, ryaStatement.getObject())) - .build()); - try { - verifyFirstStatement(ryaStatement, statements); - } finally { - statements.close(); - } - - //check po - statements = queryEngine.query(RyaQuery.builder( - new RyaStatement(null, ryaStatement.getPredicate(), - ryaStatement.getObject())).build()); - try { - verifyFirstStatement(ryaStatement, statements); - } finally { - statements.close(); - } - - //check spo - statements = queryEngine.query(RyaQuery.builder( - new RyaStatement(ryaStatement.getSubject(), - ryaStatement.getPredicate(), - ryaStatement.getObject())).build()); - try { - verifyFirstStatement(ryaStatement, statements); - } finally { - statements.close(); - } - } - - private void verifyFirstStatement( - RyaStatement ryaStatement, CloseableIterable<RyaStatement> statements) { - final Iterator<RyaStatement> iterator = statements.iterator(); - assertTrue(iterator.hasNext()); - final RyaStatement first = iterator.next(); - assertEquals(ryaStatement.getSubject(), first.getSubject()); - assertEquals(ryaStatement.getPredicate(), first.getPredicate()); - assertEquals(ryaStatement.getObject(), first.getObject()); - assertFalse(iterator.hasNext()); - } - - public void printTableData(String tableName) - throws TableNotFoundException{ - Scanner scanner = connector.createScanner(tableName, auths); - scanner.setRange(new Range()); - for(Map.Entry<Key, Value> entry : scanner) { - final Key key = entry.getKey(); - final Value value = entry.getValue(); - System.out.println(key.getRow() + " " + key.getColumnFamily() + " " + key.getColumnQualifier() + " " + key.getTimestamp() + " " + value.toString()); - } - } - -}
