http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java new file mode 100644 index 0000000..cdc3235 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java @@ -0,0 +1,256 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.builder.CompareToBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +/** + * Basic {@link WritableComparable} for using Rya data with Hadoop. + * RyaStatementWritable wraps a {@link RyaStatement}, which in turn represents a + * statement as a collection of {@link mvm.rya.api.domain.RyaURI} and + * {@link mvm.rya.api.domain.RyaType} objects. + * <p> + * This class is mutable, like all {@link org.apache.hadoop.io.Writable}s. When + * used as Mapper or Reducer input, the Hadoop framework will typically reuse + * the same object to load the next record. However, loading the next record + * will create a new RyaStatement internally. Therefore, if a statement must be + * stored for any length of time, be sure to extract the internal RyaStatement. + */ +public class RyaStatementWritable implements WritableComparable<RyaStatementWritable> { + private RyaTripleContext ryaContext; + private RyaStatement ryaStatement; + + /** + * Instantiates a RyaStatementWritable with the default RyaTripleContext. + * @param conf Unused. + */ + public RyaStatementWritable(Configuration conf) { + this(); + } + /** + * Instantiates a RyaStatementWritable with a given context. + * @param ryaContext Context used for reading and writing the statement. + */ + public RyaStatementWritable(RyaTripleContext ryaContext) { + this.ryaContext = ryaContext; + } + /** + * Instantiates a RyaStatementWritable with the default RyaTripleContext. + */ + public RyaStatementWritable() { + this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); + } + /** + * Instantiates a RyaStatementWritable with a given statement and context. + * @param ryaStatement The statement (triple) represented by this object. + * @param ryaContext Context used for reading and writing the statement. + */ + public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) { + this(ryaContext); + this.ryaStatement = ryaStatement; + } + + /** + * Gets the contained RyaStatement. + * @return The statement represented by this RyaStatementWritable. + */ + public RyaStatement getRyaStatement() { + return ryaStatement; + } + /** + * Sets the contained RyaStatement. + * @param ryaStatement The statement to be represented by this + * RyaStatementWritable. + */ + public void setRyaStatement(RyaStatement ryaStatement) { + this.ryaStatement = ryaStatement; + } + + /** + * Comparison method for natural ordering. Compares based on the logical + * triple (the s/p/o/context information in the underlying RyaStatement) + * and then by the metadata contained in the RyaStatement if the triples are + * the same. + * @return Zero if both RyaStatementWritables contain equivalent statements + * or both have null statements; otherwise, an integer whose sign + * corresponds to a consistent ordering. + */ + @Override + public int compareTo(RyaStatementWritable other) { + CompareToBuilder builder = new CompareToBuilder(); + RyaStatement rsThis = this.getRyaStatement(); + RyaStatement rsOther = other.getRyaStatement(); // should throw NPE if other is null, as per Comparable contract + builder.append(rsThis == null, rsOther == null); + if (rsThis != null && rsOther != null) { + builder.append(rsThis.getSubject(), rsOther.getSubject()); + builder.append(rsThis.getPredicate(), rsOther.getPredicate()); + builder.append(rsThis.getObject(), rsOther.getObject()); + builder.append(rsThis.getContext(), rsOther.getContext()); + builder.append(rsThis.getQualifer(), rsOther.getQualifer()); + builder.append(rsThis.getColumnVisibility(), rsOther.getColumnVisibility()); + builder.append(rsThis.getValue(), rsOther.getValue()); + builder.append(rsThis.getTimestamp(), rsOther.getTimestamp()); + } + return builder.toComparison(); + } + + /** + * Returns a hash based on the hashCode method in RyaStatement. + * @return A hash that should be consistent for equivalent RyaStatements. + */ + @Override + public int hashCode() { + if (ryaStatement == null) { + return 0; + } + return ryaStatement.hashCode(); + } + + /** + * Tests for equality using the equals method in RyaStatement. + * @param o Object to compare with + * @return true if both objects are RyaStatementWritables containing + * equivalent RyaStatements. + */ + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null || !(o instanceof RyaStatementWritable)) { + return false; + } + RyaStatement rsThis = this.getRyaStatement(); + RyaStatement rsOther = ((RyaStatementWritable) o).getRyaStatement(); + if (rsThis == null) { + return rsOther == null; + } + else { + return rsThis.equals(rsOther); + } + } + + /** + * Serializes this RyaStatementWritable. + * @param dataOutput An output stream for serialized statement data. + * @throws IOException if the RyaStatement is null or otherwise can't be + * serialized. + */ + @Override + public void write(DataOutput dataOutput) throws IOException { + if (ryaStatement == null) { + throw new IOException("Rya Statement is null"); + } + try { + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(ryaStatement); + TripleRow tripleRow = map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + byte[] row = tripleRow.getRow(); + byte[] columnFamily = tripleRow.getColumnFamily(); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + write(dataOutput, row); + write(dataOutput, columnFamily); + write(dataOutput, columnQualifier); + write(dataOutput, ryaStatement.getColumnVisibility()); + write(dataOutput, ryaStatement.getValue()); + Long timestamp = ryaStatement.getTimestamp(); + boolean b = timestamp != null; + dataOutput.writeBoolean(b); + if (b) { + dataOutput.writeLong(timestamp); + } + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + } + + /** + * Write part of a statement to an output stream. + * @param dataOutput Stream for writing serialized statements. + * @param row Individual field to write, as a byte array. + * @throws IOException if writing to the stream fails. + */ + protected void write(DataOutput dataOutput, byte[] row) throws IOException { + boolean b = row != null; + dataOutput.writeBoolean(b); + if (b) { + dataOutput.writeInt(row.length); + dataOutput.write(row); + } + } + + /** + * Read part of a statement from an input stream. + * @param dataInput Stream for reading serialized statements. + * @return The next individual field, as a byte array. + * @throws IOException if reading from the stream fails. + */ + protected byte[] read(DataInput dataInput) throws IOException { + if (dataInput.readBoolean()) { + int len = dataInput.readInt(); + byte[] bytes = new byte[len]; + dataInput.readFully(bytes); + return bytes; + }else { + return null; + } + } + + /** + * Loads a RyaStatementWritable by reading data from an input stream. + * Creates a new RyaStatement and assigns it to this RyaStatementWritable. + * @param dataInput An stream containing serialized statement data. + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + byte[] row = read(dataInput); + byte[] columnFamily = read(dataInput); + byte[] columnQualifier = read(dataInput); + byte[] columnVisibility = read(dataInput); + byte[] value = read(dataInput); + boolean b = dataInput.readBoolean(); + Long timestamp = null; + if (b) { + timestamp = dataInput.readLong(); + } + try { + ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, + new TripleRow(row, columnFamily, columnQualifier)); + ryaStatement.setColumnVisibility(columnVisibility); + ryaStatement.setValue(value); + ryaStatement.setTimestamp(timestamp); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java new file mode 100644 index 0000000..bc3af58 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java @@ -0,0 +1,196 @@ +package mvm.rya.accumulo.mr.examples; + +import java.io.BufferedReader; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.util.Date; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFWriter; +import org.openrdf.rio.Rio; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaToRdfConversions; + +/** + * Example of using a MapReduce tool to get triples from a Rya instance and serialize them to a text file as RDF. + */ +public class TextOutputExample extends AbstractAccumuloMRTool { + private static Logger logger = Logger.getLogger(TextOutputExample.class); + private static RDFFormat rdfFormat = RDFFormat.NQUADS; + private static String tempDir; + + // Connection information + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String INSTANCE_NAME = "instanceName"; + private static final String PREFIX = "rya_example_"; + + public static void main(String[] args) throws Exception { + setUpRya(); + TextOutputExample tool = new TextOutputExample(); + ToolRunner.run(new Configuration(), tool, args); + } + + static void setUpRya() throws AccumuloException, AccumuloSecurityException, RyaDAOException { + MockInstance mock = new MockInstance(INSTANCE_NAME); + Connector conn = mock.getConnector(USERNAME, new PasswordToken(PASSWORD)); + AccumuloRyaDAO dao = new AccumuloRyaDAO(); + dao.setConnector(conn); + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(PREFIX); + dao.setConf(conf); + dao.init(); + String ns = "http://example.com/"; + dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p1"), new RyaURI(ns+"o1"))); + dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p2"), new RyaURI(ns+"o2"))); + dao.add(new RyaStatement(new RyaURI(ns+"s2"), new RyaURI(ns+"p1"), new RyaURI(ns+"o3"), + new RyaURI(ns+"g1"))); + dao.add(new RyaStatement(new RyaURI(ns+"s3"), new RyaURI(ns+"p3"), new RyaURI(ns+"o3"), + new RyaURI(ns+"g2"))); + dao.destroy(); + } + + @Override + public int run(String[] args) throws Exception { + logger.info("Configuring tool to connect to mock instance..."); + MRUtils.setACUserName(conf, USERNAME); + MRUtils.setACPwd(conf, PASSWORD); + MRUtils.setACInstance(conf, INSTANCE_NAME); + MRUtils.setACMock(conf, true); + MRUtils.setTablePrefix(conf, PREFIX); + + logger.info("Initializing tool and checking configuration..."); + init(); + + logger.info("Creating Job, setting Mapper class, and setting no Reducer..."); + Job job = Job.getInstance(conf); + job.setJarByClass(TextOutputExample.class); + job.setMapperClass(RyaToRdfMapper.class); + job.setNumReduceTasks(0); + + logger.info("Configuring Job to take input from the mock Rya instance..."); + setupRyaInput(job); + + logger.info("Configuring Job to output Text to a new temporary directory..."); + tempDir = Files.createTempDirectory("rya-mr-example").toString(); + Path outputPath = new Path(tempDir, "rdf-output"); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputPath); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + + Date start = new Date(); + logger.info("Starting Job at: start"); + boolean success = job.waitForCompletion(true); + + if (!success) { + System.out.println("Job Failed!!!"); + return 1; + } + + Date end = new Date(); + logger.info("Job ended: " + end); + logger.info("The job took " + (end.getTime() - start.getTime()) / 1000 + " seconds."); + // Print output and then delete temp files: + java.nio.file.Path tempPath = FileSystems.getDefault().getPath(tempDir); + for (java.nio.file.Path subdir : Files.newDirectoryStream(tempPath)) { + logger.info(""); + logger.info("Output files:"); + for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) { + logger.info("\t" + outputFile); + } + for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir, "part*")) { + logger.info(""); + logger.info("Contents of " + outputFile + ":"); + BufferedReader reader = Files.newBufferedReader(outputFile, Charset.defaultCharset()); + String line; + while ((line = reader.readLine()) != null) { + logger.info("\t" + line); + } + reader.close(); + } + for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) { + Files.deleteIfExists(outputFile); + } + Files.deleteIfExists(subdir); + } + Files.deleteIfExists(tempPath); + logger.info(""); + logger.info("Temporary directory " + tempDir + " deleted."); + + return 0; + } + + static class RyaToRdfMapper extends Mapper<Text, RyaStatementWritable, NullWritable, Text> { + Text textOut = new Text(); + @Override + protected void map(Text key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { + // receives a RyaStatementWritable; convert to a Statement + RyaStatement rstmt = value.getRyaStatement(); + Statement st = RyaToRdfConversions.convertStatement(rstmt); + logger.info("Mapper receives: " + rstmt); + // then convert to an RDF string + StringWriter writer = new StringWriter(); + try { + RDFWriter rdfWriter = Rio.createWriter(rdfFormat, writer); + rdfWriter.startRDF(); + rdfWriter.handleStatement(st); + rdfWriter.endRDF(); + } catch (RDFHandlerException e) { + throw new IOException("Error writing RDF data", e); + } + // Write the string to the output + String line = writer.toString().trim(); + logger.info("Serialized to RDF: " + line); + textOut.set(line); + context.write(NullWritable.get(), textOut); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java new file mode 100644 index 0000000..ee4e00b --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java @@ -0,0 +1,258 @@ +package mvm.rya.accumulo.mr.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.io.IOException; +import java.util.Date; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + +/** + * Count subject, predicate, object. Save in table + * Class RdfCloudTripleStoreCountTool + * Date: Apr 12, 2011 + * Time: 10:39:40 AM + * @deprecated + */ +public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool { + + public static void main(String[] args) { + try { + + ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * cloudbase props + */ + + @Override + public int run(String[] strings) throws Exception { + conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); + + //initialize + init(); + + Job job = new Job(conf); + job.setJarByClass(AccumuloRdfCountTool.class); + setupAccumuloInput(job); + + AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + // set mapper and reducer classes + job.setMapperClass(CountPiecesMapper.class); + job.setCombinerClass(CountPiecesCombiner.class); + job.setReducerClass(CountPiecesReducer.class); + + String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX; + setupAccumuloOutput(job, outputTable); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return 0; + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> { + + public static final byte[] EMPTY_BYTES = new byte[0]; + private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; + + ValueFactoryImpl vf = new ValueFactoryImpl(); + + private Text keyOut = new Text(); + private LongWritable valOut = new LongWritable(1); + private RyaTripleContext ryaContext; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( + conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); + ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + try { + RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); + //count each piece subject, pred, object + + String subj = statement.getSubject().getData(); + String pred = statement.getPredicate().getData(); +// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); + RyaURI scontext = statement.getContext(); + boolean includesContext = scontext != null; + String scontext_str = (includesContext) ? scontext.getData() : null; + + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + output.writeUTF(subj); + output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + + output = ByteStreams.newDataOutput(); + output.writeUTF(pred); + output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + } + } + + public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { + + private LongWritable valOut = new LongWritable(); + + // TODO: can still add up to be large I guess + // any count lower than this does not need to be saved + public static final int TOO_LOW = 2; + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + valOut.set(count); + context.write(key, valOut); + } + + } + + public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> { + + Text row = new Text(); + Text cat_txt = new Text(); + Value v_out = new Value(); + ValueFactory vf = new ValueFactoryImpl(); + + // any count lower than this does not need to be saved + public static final int TOO_LOW = 10; + private String tablePrefix; + protected Text table; + private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP); + if (cv_s != null) + cv = new ColumnVisibility(cv_s); + } + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); + String v = badi.readUTF(); + cat_txt.set(badi.readUTF()); + + Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; + boolean includesContext = badi.readBoolean(); + if (includesContext) { + columnQualifier = new Text(badi.readUTF()); + } + + row.set(v); + Mutation m = new Mutation(row); + v_out.set((count + "").getBytes()); + m.put(cat_txt, columnQualifier, cv, v_out); + context.write(table, m); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java new file mode 100644 index 0000000..7857e45 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java @@ -0,0 +1,91 @@ +package mvm.rya.accumulo.mr.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.rio.RDFFormat; + +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.MRUtils; + +/** + * Reads RDF data from one or more file(s) and inserts statements into Rya. + * <p> + * Uses {@link mvm.rya.accumulo.mr.RdfFileInputFormat} to read data. + * <p> + * Takes one argument: the file or directory to read (from HDFS). + * <p> + * Expects configuration: + * <p> + * - RDF format, named by parameter "rdf.format"; see {@link RDFFormat}. + * Defaults to rdf/xml. If using multiple files, all must be the same format. + * <p> + * - Accumulo and Rya configuration parameters as named in {@link MRUtils} + * (username, password, instance name, zookeepers, and Rya prefix) + * <p> + * - Indexing configuration parameters as named in + * {@link mvm.rya.indexing.accumulo.ConfigUtils} (enable or disable freetext, + * geo, temporal, and entity indexing, and specify predicates for each + * indexer). If not given, no secondary indexing is done. + */ +public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool { + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new RdfFileInputTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public int run(String[] args) throws Exception { + init(); + Job job = Job.getInstance(conf, "Rdf File Input"); + job.setJarByClass(RdfFileInputTool.class); + + String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]); + setupFileInput(job, inputPath, RDFFormat.RDFXML); + setupRyaOutput(job); + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + long n = job.getCounters() + .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue(); + System.out.println(n + " statement(s) inserted to Rya."); + } else { + System.out.println("Job Failed!!!"); + } + return exitCode; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java new file mode 100644 index 0000000..d713d85 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java @@ -0,0 +1,241 @@ +package mvm.rya.accumulo.mr.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.MRUtils; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.calrissian.mango.types.LexiTypeEncoders; +import org.calrissian.mango.types.TypeEncoder; + +import java.io.IOException; +import java.util.Date; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.*; + +/** + */ +public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool { + @Override + public int run(String[] strings) throws Exception { + conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2"); + //faster + init(); + + Job job = new Job(conf); + job.setJarByClass(Upgrade322Tool.class); + + setupAccumuloInput(job); + AccumuloInputFormat.setInputTableName(job, MRUtils.getTablePrefix(conf) + TBL_OSP_SUFFIX); + + //we do not need to change any row that is a string, custom, or uri type + IteratorSetting regex = new IteratorSetting(30, "regex", + RegExFilter.class); + RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false); + RegExFilter.setNegate(regex, true); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + + setupAccumuloOutput(job, MRUtils.getTablePrefix(conf) + + TBL_SPO_SUFFIX); + + // set mapper and reducer classes + job.setMapperClass(Upgrade322Mapper.class); + job.setReducerClass(Reducer.class); + + // Submit the job + return job.waitForCompletion(true) ? 0 : 1; + } + + public static void main(String[] args) { + try { + ToolRunner.run(new Configuration(), new Upgrade322Tool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Reading from the OSP table + */ + public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> { + + private String tablePrefix; + private Text spoTable; + private Text poTable; + private Text ospTable; + + private final UpgradeObjectSerialization upgradeObjectSerialization; + + public Upgrade322Mapper() { + this(new UpgradeObjectSerialization()); + } + + public Upgrade322Mapper( + UpgradeObjectSerialization upgradeObjectSerialization) { + this.upgradeObjectSerialization = upgradeObjectSerialization; + } + + @Override + protected void setup( + Context context) throws IOException, InterruptedException { + super.setup(context); + + tablePrefix = context.getConfiguration().get( + MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF); + spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX); + poTable = new Text(tablePrefix + TBL_PO_SUFFIX); + ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX); + } + + @Override + protected void map( + Key key, Value value, Context context) + throws IOException, InterruptedException { + + //read the key, expect OSP + final String row = key.getRow().toString(); + final int firstDelim = row.indexOf(DELIM); + final int secondDelim = row.indexOf(DELIM, firstDelim + 1); + final int typeDelim = row.lastIndexOf(TYPE_DELIM); + final String oldSerialization = row.substring(0, firstDelim); + char typeMarker = row.charAt(row.length() - 1); + + final String subject = row.substring(firstDelim + 1, secondDelim); + final String predicate = row.substring(secondDelim + 1, typeDelim); + final String typeSuffix = TYPE_DELIM + typeMarker; + + String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker); + if(newSerialization == null) { + return; + } + + //write out delete Mutations + Mutation deleteOldSerialization_osp = new Mutation(key.getRow()); + deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(), + key.getColumnVisibilityParsed()); + Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix); + deleteOldSerialization_po.putDelete(key.getColumnFamily(), + key.getColumnQualifier(), + key.getColumnVisibilityParsed()); + Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix); + deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(), + key.getColumnVisibilityParsed()); + + //write out new serialization + Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix); + putNewSerialization_osp.put(key.getColumnFamily(), + key.getColumnQualifier(), + key.getColumnVisibilityParsed(), + key.getTimestamp(), value); + Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix); + putNewSerialization_po.put(key.getColumnFamily(), + key.getColumnQualifier(), + key.getColumnVisibilityParsed(), + key.getTimestamp(), value); + Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix); + putNewSerialization_spo.put(key.getColumnFamily(), + key.getColumnQualifier(), + key.getColumnVisibilityParsed(), + key.getTimestamp(), value); + + //write out deletes to all tables + context.write(ospTable, deleteOldSerialization_osp); + context.write(poTable, deleteOldSerialization_po); + context.write(spoTable, deleteOldSerialization_spo); + + //write out inserts to all tables + context.write(ospTable, putNewSerialization_osp); + context.write(poTable, putNewSerialization_po); + context.write(spoTable, putNewSerialization_spo); + } + } + + public static class UpgradeObjectSerialization { + + public static final TypeEncoder<Boolean, String> + BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder(); + public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER + = LexiTypeEncoders.byteEncoder(); + public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER + = LexiTypeEncoders.dateEncoder(); + public static final TypeEncoder<Integer, String> + INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder(); + public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER + = LexiTypeEncoders.longEncoder(); + public static final TypeEncoder<Double, String> + DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder(); + + public String upgrade(String object, int typeMarker) { + switch(typeMarker) { + case 10: //boolean + final boolean bool = Boolean.parseBoolean(object); + return BOOLEAN_STRING_TYPE_ENCODER.encode(bool); + case 9: //byte + final byte b = Byte.parseByte(object); + return BYTE_STRING_TYPE_ENCODER.encode(b); + case 4: //long + final Long lng = Long.parseLong(object); + return LONG_STRING_TYPE_ENCODER.encode(lng); + case 5: //int + final Integer i = Integer.parseInt(object); + return INTEGER_STRING_TYPE_ENCODER.encode(i); + case 6: //double + String exp = object.substring(2, 5); + char valueSign = object.charAt(0); + char expSign = object.charAt(1); + Integer expInt = Integer.parseInt(exp); + if (expSign == '-') { + expInt = 999 - expInt; + } + final String expDoubleStr = + String.format("%s%sE%s%d", valueSign, + object.substring(6), + expSign, expInt); + return DOUBLE_STRING_TYPE_ENCODER + .encode(Double.parseDouble(expDoubleStr)); + case 7: //datetime + //check to see if it is an early release that includes the exact term xsd:dateTime + final Long l = Long.MAX_VALUE - Long.parseLong(object); + Date date = new Date(l); + return DATE_STRING_TYPE_ENCODER.encode(date); + default: + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java new file mode 100644 index 0000000..cda66bd --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java @@ -0,0 +1,180 @@ +package mvm.rya.accumulo.mr; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.ContextStatementImpl; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.rio.RDFFormat; + +import mvm.rya.api.resolver.RyaToRdfConversions; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class RdfFileInputFormatTest { + static String NT_INPUT = "src/test/resources/test.ntriples"; + static String TRIG_INPUT = "src/test/resources/namedgraphs.trig"; + + Configuration conf; + Job job; + FileSystem fs; + RdfFileInputFormat.RdfFileRecordReader reader; + + @Rule + public ExpectedException expected = ExpectedException.none(); + + @Before + public void before() throws IOException { + conf = new Configuration(); + conf.set("fs.default.name", "file:///"); + fs = FileSystem.get(conf); + job = Job.getInstance(conf); + } + + void init(String filename) throws IOException, InterruptedException { + conf = job.getConfiguration(); + File inputFile = new File(filename); + Path inputPath = new Path(inputFile.getAbsoluteFile().toURI()); + InputSplit split = new FileSplit(inputPath, 0, inputFile.length(), null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + reader = (RdfFileInputFormat.RdfFileRecordReader) new RdfFileInputFormat().createRecordReader(split, context); + reader.initialize(split, context); + } + + @Test + public void testStatementInput() throws Exception { + RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES); + init(NT_INPUT); + String prefix = "urn:lubm:rdfts#"; + URI[] gs = { + new URIImpl(prefix + "GraduateStudent01"), + new URIImpl(prefix + "GraduateStudent02"), + new URIImpl(prefix + "GraduateStudent03"), + new URIImpl(prefix + "GraduateStudent04") + }; + URI hasFriend = new URIImpl(prefix + "hasFriend"); + Statement[] statements = { + new StatementImpl(gs[0], hasFriend, gs[1]), + new StatementImpl(gs[1], hasFriend, gs[2]), + new StatementImpl(gs[2], hasFriend, gs[3]) + }; + int count = 0; + while (reader.nextKeyValue()) { + Assert.assertEquals(statements[count], + RyaToRdfConversions.convertStatement(reader.getCurrentValue().getRyaStatement())); + count++; + Assert.assertEquals(count, reader.getCurrentKey().get()); + } + Assert.assertEquals(3, count); + } + + @Test + public void testTrigInput() throws Exception { + RdfFileInputFormat.setRDFFormat(job, RDFFormat.TRIG); + init(TRIG_INPUT); + Assert.assertTrue(reader.nextKeyValue()); + Assert.assertEquals(1, reader.getCurrentKey().get()); + Statement expected = new ContextStatementImpl( + new URIImpl("http://www.example.org/exampleDocument#Monica"), + new URIImpl("http://www.example.org/vocabulary#name"), + new LiteralImpl("Monica Murphy"), + new URIImpl("http://www.example.org/exampleDocument#G1")); + Statement actual = RyaToRdfConversions.convertStatement( + reader.getCurrentValue().getRyaStatement()); + Assert.assertEquals(expected, actual); + } + + @Test + public void testBlockStatementQueue() throws Exception { + RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES); + RdfFileInputFormat.setStatementBufferSize(job, 2); + init(NT_INPUT); + // 3 statements in total, plus done signal: should fill up three times + int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill + int maxSeconds = 120; // timeout that should never be reached + for (int i = 0; i < 3; i++) { + long t = 0; + while (reader.statementCache.remainingCapacity() > 0) { + if (t >= (maxSeconds*1000)) { + Assert.fail("Statement buffer still hasn't filled up after " + maxSeconds + " seconds."); + } + Assert.assertTrue(reader.statementCache.size() <= 2); + Thread.sleep(interval); + t += interval; + } + Assert.assertEquals(2, reader.statementCache.size()); + Assert.assertEquals(0, reader.statementCache.remainingCapacity()); + Assert.assertTrue(reader.nextKeyValue()); + } + // Then the only thing in the queue should be the done signal + Assert.assertSame(RdfFileInputFormat.DONE, reader.statementCache.peek()); + Assert.assertEquals(1, reader.statementCache.size()); + Assert.assertFalse(reader.nextKeyValue()); + Assert.assertTrue(reader.statementCache.isEmpty()); + } + + @Test + public void testFailGracefully() throws Exception { + // Pass the wrong RDF format and make sure all threads terminate + int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill + int maxSeconds = 60; // timeout that should never be reached + RdfFileInputFormat.setRDFFormat(job, RDFFormat.RDFXML); + RdfFileInputFormat.setTimeout(job, maxSeconds*2); + init(NT_INPUT); + long t = 0; + while (reader.statementCache.isEmpty()) { + if (t >= (maxSeconds*1000)) { + Assert.fail("Statement buffer still hasn't been sent error signal after " + maxSeconds + " seconds."); + } + Thread.sleep(interval); + t += interval; + } + Assert.assertSame(RdfFileInputFormat.ERROR, reader.statementCache.peek()); + expected.expect(IOException.class); + try { + Assert.assertFalse(reader.nextKeyValue()); + } + catch (Exception e) { + Assert.assertNull(reader.getCurrentKey()); + Assert.assertNull(reader.getCurrentValue()); + Assert.assertFalse(reader.readerThread.isAlive()); + Assert.assertFalse(reader.parserThread.isAlive()); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java new file mode 100644 index 0000000..2755732 --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java @@ -0,0 +1,156 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaTripleContext; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class RyaInputFormatTest { + + static String username = "root", table = "rya_spo"; + static PasswordToken password = new PasswordToken(""); + + static Instance instance; + static AccumuloRyaDAO apiImpl; + + @BeforeClass + public static void init() throws Exception { + instance = new MockInstance(RyaInputFormatTest.class.getName() + ".mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + } + + @Before + public void before() throws Exception { + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @Test + public void testInputFormat() throws Exception { + + + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(new byte[0]) + .setValue(new byte[0]) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + RyaInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + RyaInputFormat.setConnectorInfo(jobConf, username, password); + RyaInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); + + AccumuloInputFormat.setInputTableName(jobConf, table); + AccumuloInputFormat.setInputTableName(jobConf, table); + AccumuloInputFormat.setScanIsolation(jobConf, false); + AccumuloInputFormat.setLocalIterators(jobConf, false); + AccumuloInputFormat.setOfflineTableScan(jobConf, false); + + RyaInputFormat inputFormat = new RyaInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List<InputSplit> splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader<Text, RyaStatementWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List<RyaStatement> results = new ArrayList<RyaStatement>(); + while(ryaStatementRecordReader.nextKeyValue()) { + RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue(); + RyaStatement value = writable.getRyaStatement(); + Text text = ryaStatementRecordReader.getCurrentKey(); + RyaStatement stmt = RyaStatement.builder() + .setSubject(value.getSubject()) + .setPredicate(value.getPredicate()) + .setObject(value.getObject()) + .setContext(value.getContext()) + .setQualifier(value.getQualifer()) + .setColumnVisibility(value.getColumnVisibility()) + .setValue(value.getValue()) + .build(); + results.add(stmt); + + System.out.println(text); + System.out.println(value); + } + + Assert.assertTrue(results.size() == 2); + Assert.assertTrue(results.contains(input)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java new file mode 100644 index 0000000..a48afa3 --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java @@ -0,0 +1,324 @@ +package mvm.rya.accumulo.mr; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.Point; +import com.vividsolutions.jts.geom.PrecisionModel; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInstantRfc3339; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.SimpleTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.geo.GeoConstants; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class RyaOutputFormatTest { + private static final String CV = "test_auth"; + private static final String GRAPH = "http://example.org/graph"; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String INSTANCE_NAME = RyaOutputFormatTest.class.getSimpleName() + ".rya_output"; + private static final String PREFIX = "ryaoutputformattest_"; + + MockInstance instance; + Connector connector; + AccumuloRyaDAO dao; + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + Job job; + RyaTripleContext ryaContext; + + @Before + public void init() throws Exception { + MRUtils.setACMock(conf, true); + MRUtils.setACInstance(conf, INSTANCE_NAME); + MRUtils.setACUserName(conf, USERNAME); + MRUtils.setACPwd(conf, PASSWORD); + MRUtils.setTablePrefix(conf, PREFIX); + conf.setTablePrefix(PREFIX); + conf.setAuths(CV); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, INSTANCE_NAME); + conf.set(ConfigUtils.CLOUDBASE_USER, USERNAME); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, PASSWORD); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.setClass(ConfigUtils.TOKENIZER_CLASS, SimpleTokenizer.class, Tokenizer.class); + ryaContext = RyaTripleContext.getInstance(conf); + instance = new MockInstance(INSTANCE_NAME); + connector = instance.getConnector(USERNAME, new PasswordToken(PASSWORD)); + job = Job.getInstance(conf); + RyaOutputFormat.setMockInstance(job, instance.getInstanceName()); + AccumuloOutputFormat.setConnectorInfo(job, USERNAME, new PasswordToken(PASSWORD)); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, PREFIX + "default"); + RyaOutputFormat.setTablePrefix(job, PREFIX); + } + + private void write(RyaStatement... input) throws IOException, InterruptedException { + RecordWriter<Writable, RyaStatementWritable> writer = + new RyaOutputFormat.RyaRecordWriter(job.getConfiguration()); + for (RyaStatement rstmt : input) { + RyaStatementWritable rsw = new RyaStatementWritable(rstmt, ryaContext); + writer.write(new Text("unused"), rsw); + } + writer.close(null); + } + + @Test + public void testOutputFormat() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(CV.getBytes()) + .setValue(new byte[0]) + .setContext(new RyaURI(GRAPH)) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, true); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, false); + write(input); + TestUtils.verify(connector, conf, input); + } + + @Test + public void testDefaultCV() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setContext(new RyaURI(GRAPH)) + .build(); + RyaStatement expected = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setContext(new RyaURI(GRAPH)) + .setColumnVisibility(CV.getBytes()) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, true); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, false); + RyaOutputFormat.setDefaultVisibility(job, CV); + write(input); + TestUtils.verify(connector, conf, expected); + } + + @Test + public void testDefaultGraph() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setColumnVisibility(CV.getBytes()) + .build(); + RyaStatement expected = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setColumnVisibility(CV.getBytes()) + .setContext(new RyaURI(GRAPH)) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, true); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, false); + RyaOutputFormat.setDefaultContext(job, GRAPH); + write(input); + TestUtils.verify(connector, conf, expected); + } + + @Test + public void testFreeTextIndexing() throws Exception { + AccumuloFreeTextIndexer ft = new AccumuloFreeTextIndexer(); + ft.setConf(conf); + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI(GRAPH + ":s")) + .setPredicate(new RyaURI(GRAPH + ":p")) + .setObject(new RyaType(XMLSchema.STRING, "one two three four five")) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, false); + RyaOutputFormat.setFreeTextEnabled(job, true); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, false); + write(input); + Set<Statement> empty = new HashSet<>(); + Set<Statement> expected = new HashSet<>(); + expected.add(RyaToRdfConversions.convertStatement(input)); + Assert.assertEquals(expected, getSet(ft.queryText("one", new StatementConstraints()))); + Assert.assertEquals(empty, getSet(ft.queryText("!two", new StatementConstraints()))); + Assert.assertEquals(expected, getSet(ft.queryText("*r", new StatementConstraints()))); + Assert.assertEquals(empty, getSet(ft.queryText("r*", new StatementConstraints()))); + Assert.assertEquals(expected, getSet(ft.queryText("!r*", new StatementConstraints()))); + Assert.assertEquals(expected, getSet(ft.queryText("t* & !s*", new StatementConstraints()))); + ft.close(); + } + + @Test + public void testTemporalIndexing() throws Exception { + TemporalInstant[] instants = { + new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 01), + new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 02), + new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03), + new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03) + }; + Statement[] statements = new Statement[instants.length]; + RyaOutputFormat.setCoreTablesEnabled(job, false); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, true); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, false); + ValueFactory vf = new ValueFactoryImpl(); + for (int i = 0; i < instants.length; i++) { + RyaType time = RdfToRyaConversions.convertLiteral(vf.createLiteral(instants[i].toString())); + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI(GRAPH + ":s")) + .setPredicate(new RyaURI(GRAPH + ":p")) + .setObject(time) + .build(); + write(input); + statements[i] = RyaToRdfConversions.convertStatement(input); + } + AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); + temporal.setConf(conf); + Set<Statement> empty = new HashSet<>(); + Set<Statement> head = new HashSet<>(); + Set<Statement> tail = new HashSet<>(); + head.add(statements[0]); + tail.add(statements[2]); + tail.add(statements[3]); + Assert.assertEquals(empty, getSet(temporal.queryInstantBeforeInstant(instants[0], new StatementConstraints()))); + Assert.assertEquals(empty, getSet(temporal.queryInstantAfterInstant(instants[3], new StatementConstraints()))); + Assert.assertEquals(head, getSet(temporal.queryInstantBeforeInstant(instants[1], new StatementConstraints()))); + Assert.assertEquals(tail, getSet(temporal.queryInstantAfterInstant(instants[1], new StatementConstraints()))); + temporal.close(); + } + + @Test + public void testGeoIndexing() throws Exception { + GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326); + Point p1 = gf.createPoint(new Coordinate(1, 1)); + Point p2 = gf.createPoint(new Coordinate(2, 2)); + GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer(); + geo.setConf(conf); + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI(GRAPH + ":s")) + .setPredicate(new RyaURI(GRAPH + ":p")) + .setObject(new RyaType(GeoConstants.XMLSCHEMA_OGC_WKT, "Point(2 2)")) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, false); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, true); + RyaOutputFormat.setEntityEnabled(job, false); + write(input); + Set<Statement> expected = new HashSet<>(); + Assert.assertEquals(expected, getSet(geo.queryContains(p1, new StatementConstraints()))); + expected.add(RyaToRdfConversions.convertStatement(input)); + Assert.assertEquals(expected, getSet(geo.queryEquals(p2, new StatementConstraints()))); + geo.close(); + } + + @Test + public void testEntityIndexing() throws Exception { + EntityCentricIndex entity = new EntityCentricIndex(); + entity.setConf(conf); + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI(GRAPH + ":s")) + .setPredicate(new RyaURI(GRAPH + ":p")) + .setObject(new RyaURI(GRAPH + ":o")) + .build(); + RyaOutputFormat.setCoreTablesEnabled(job, false); + RyaOutputFormat.setFreeTextEnabled(job, false); + RyaOutputFormat.setTemporalEnabled(job, false); + RyaOutputFormat.setGeoEnabled(job, false); + RyaOutputFormat.setEntityEnabled(job, true); + write(input); + entity.close(); + Set<Statement> expected = new HashSet<>(); + Set<Statement> inserted = new HashSet<>(); + expected.add(RyaToRdfConversions.convertStatement(input)); + String table = ConfigUtils.getEntityTableName(conf); + Scanner scanner = connector.createScanner(table, new Authorizations(CV)); + for (Map.Entry<Key, Value> row : scanner) { + System.out.println(row); + inserted.add(RyaToRdfConversions.convertStatement( + EntityCentricIndex.deserializeStatement(row.getKey(), row.getValue()))); + } + Assert.assertEquals(expected, inserted); + } + + private static <X> Set<X> getSet(CloseableIteration<X, ?> iter) throws Exception { + Set<X> set = new HashSet<X>(); + while (iter.hasNext()) { + set.add(iter.next()); + } + return set; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java new file mode 100644 index 0000000..8bebdf4 --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java @@ -0,0 +1,146 @@ +package mvm.rya.accumulo.mr; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.openrdf.model.vocabulary.XMLSchema; +import org.junit.Assert; +import org.junit.Rule; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaTripleContext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class RyaStatementWritableTest { + private static final RyaURI s1 = new RyaURI(":s"); + private static final RyaURI p1 = new RyaURI(":p"); + private static final RyaType o1 = new RyaType(XMLSchema.INTEGER, "123"); + private static final RyaURI s2 = new RyaURI(":s2"); + private static final RyaURI p2 = new RyaURI(":p2"); + private static final RyaType o2 = new RyaType(XMLSchema.STRING, "123"); + private static final RyaURI graph1 = new RyaURI("http://example.org/graph1"); + private static final RyaURI graph2 = new RyaURI("http://example.org/graph2"); + private static final byte[] cv1 = "test_visibility".getBytes(); + private static final long t1 = 1000; + private static final long t2 = 1001; + private static final RyaStatement rs1 = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build(); + // Equivalent: + private static final RyaStatement rs1b = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build(); + // Differ in one way each: + private static final RyaStatement rsGraph = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph2).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build(); + private static final RyaStatement rsCv = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build(); + private static final RyaStatement rsQualifier = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph1).setColumnVisibility(cv1).setQualifier("q2").setTimestamp(t1).build(); + private static final RyaStatement rsTimestamp = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1) + .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t2).build(); + // Different triple: + private static final RyaStatement rs2 = RyaStatement.builder().setSubject(s2).setPredicate(p2).setObject(o2) + .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build(); + + private static final RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); + + @Rule + public ExpectedException expected = ExpectedException.none(); + + @Test + public void testEquals() throws Exception { + RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext); + RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null); + RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext); + RyaStatementWritable rswNull = new RyaStatementWritable(null, ryaContext); + Assert.assertEquals("Equivalent statements should be equal", rsw1, rsw1b); + Assert.assertFalse("equals(null) should be false", rsw1.equals(null)); + Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rsw2); + Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rswNull); + Assert.assertNotEquals("Statements with different named graphs are not equal", rsw1, + new RyaStatementWritable(rsGraph, ryaContext)); + Assert.assertNotEquals("Statements with different column visibilities are not equal", rsw1, + new RyaStatementWritable(rsCv, ryaContext)); + Assert.assertNotEquals("Statements with different column qualifiers are not equal", rsw1, + new RyaStatementWritable(rsQualifier, ryaContext)); + Assert.assertNotEquals("Statements with different timestamps are not equal", rsw1, + new RyaStatementWritable(rsTimestamp, ryaContext)); + } + + @Test + public void testCompareTo() throws Exception { + RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext); + RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null); + RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, null); + RyaStatementWritable rswGraph = new RyaStatementWritable(rsCv, ryaContext); + RyaStatementWritable rswCv = new RyaStatementWritable(rsCv, ryaContext); + RyaStatementWritable rswQualifier = new RyaStatementWritable(rsQualifier, ryaContext); + RyaStatementWritable rswTimestamp = new RyaStatementWritable(rsTimestamp, ryaContext); + Assert.assertEquals("x.compareTo(x) should always return 0", 0, rsw1.compareTo(rsw1)); + Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1.compareTo(rsw1b)); + Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1b.compareTo(rsw1)); + Assert.assertNotEquals("Statements with different named graphs are not equal", 0, rsw1.compareTo(rswGraph)); + Assert.assertNotEquals("Statements with different column visibilities are not equal", 0, rsw1.compareTo(rswCv)); + Assert.assertNotEquals("Statements with different column qualifiers are not equal", 0, rsw1.compareTo(rswQualifier)); + Assert.assertNotEquals("Statements with different timestamps are not equal", 0, rsw1.compareTo(rswTimestamp)); + Assert.assertEquals("compareTo in opposite directions should yield opposite signs", + Integer.signum(rsw1.compareTo(rsw2))*-1, Integer.signum(rsw2.compareTo(rsw1))); + // cycles shouldn't be possible; these comparisons can't all be negative or all be positive: + int x = Integer.signum(rsw1.compareTo(rsw2)) + + Integer.signum(rsw2.compareTo(rsw1b)) + + Integer.signum(rsw1b.compareTo(rsw1)); + Assert.assertNotEquals("compareTo cycle detected", 3, Math.abs(x)); + // compareTo(null) should always throw an exception: + expected.expect(NullPointerException.class); + rsw1.compareTo(null); + } + + @Test + public void testSerializeDeserialize() throws Exception { + RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext); + RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream bytesOut = new DataOutputStream(bytes); + rsw1.write(bytesOut); + rsw2.write(bytesOut); + DataInputStream bytesIn = new DataInputStream(new ByteArrayInputStream(bytes.toByteArray())); + RyaStatementWritable deserialized = new RyaStatementWritable(); + // Verify initial deserialization: + deserialized.readFields(bytesIn); + Assert.assertEquals("Deserialized statement not equal to original", rsw1, deserialized); + Assert.assertEquals("Deserialized statement has different hash code", rsw1.hashCode(), deserialized.hashCode()); + Assert.assertEquals("original.compareTo(deserialized) should equal 0", 0, rsw1.compareTo(deserialized)); + // Verify that a second read mutates the Writable object into the correct second record: + RyaStatement deserializedStatement = deserialized.getRyaStatement(); + deserialized.readFields(bytesIn); + Assert.assertEquals("Deserialized statement not overwritten on second read", rsw2, deserialized); + // Verify that the internal RyaStatement object is recreated, not overwritten: + RyaStatement deserializedStatement2 = deserialized.getRyaStatement(); + Assert.assertNotSame("Reading a second record should create a new internal RyaStatement", + deserializedStatement, deserializedStatement2); + } +} \ No newline at end of file
