http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java new file mode 100644 index 0000000..6d8b08a --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.registry.writers; + +import java.io.IOException; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfQuadWriter; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.writer.WriterStreamRDFBlocks; + +/** + * + */ +public class TriGWriterFactory extends AbstractQuadsOnlyWriterFactory { + + public TriGWriterFactory() { + super(Lang.TRIG); + } + + @Override + public <TKey> RecordWriter<TKey, QuadWritable> createQuadWriter(Writer writer, Configuration config) + throws IOException { + return new StreamRdfQuadWriter<TKey>(new WriterStreamRDFBlocks(writer), writer); + } + +}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java new file mode 100644 index 0000000..0e1b7b2 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.registry.writers; + +import java.io.IOException; +import java.io.Writer; +import java.nio.charset.Charset; + +import org.apache.commons.io.output.WriterOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfQuadWriter; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.writer.StreamWriterTriX; + +/** + * + */ +public class TriXWriterFactory extends AbstractQuadsOnlyWriterFactory { + + public TriXWriterFactory() { + super(Lang.TRIX); + } + + @Override + public <TKey> RecordWriter<TKey, QuadWritable> createQuadWriter(Writer writer, Configuration config) + throws IOException { + return new StreamRdfQuadWriter<>(new StreamWriterTriX(new WriterOutputStream(writer, Charset.forName("utf-8"))), writer); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java new file mode 100644 index 0000000..c837f12 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.registry.writers; + +import java.io.IOException; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfTripleWriter; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.writer.WriterStreamRDFBlocks; + +/** + * + */ +public class TurtleWriterFactory extends AbstractTriplesOnlyWriterFactory { + + public TurtleWriterFactory() { + super(Lang.TURTLE, Lang.TTL, Lang.N3); + } + + @Override + public <TKey> RecordWriter<TKey, TripleWritable> createTripleWriter(Writer writer, Configuration config) + throws IOException { + return new StreamRdfTripleWriter<>(new WriterStreamRDFBlocks(writer), writer); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory new file mode 100644 index 0000000..ec0e48a --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory @@ -0,0 +1,10 @@ +# Default Reader Factory implementations +org.apache.jena.hadoop.rdf.io.registry.readers.JsonLDReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.NQuadsReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.NTriplesReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.RdfJsonReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.ThriftReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.TriGReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.TriXReaderFactory +org.apache.jena.hadoop.rdf.io.registry.readers.TurtleReaderFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory new file mode 100644 index 0000000..164880d --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory @@ -0,0 +1,10 @@ +# Default Writer Factory implementations +org.apache.jena.hadoop.rdf.io.registry.writers.JsonLDWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.NQuadsWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.NTriplesWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.RdfJsonWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.RdfXmlWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.ThriftWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.TriGWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.TriXWriterFactory +org.apache.jena.hadoop.rdf.io.registry.writers.TurtleWriterFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java new file mode 100644 index 0000000..5762fb7 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.log4j.Logger; + + +/** + * A test mapper which takes in line based RDF triple input and just produces triples + * + * + */ +public class RdfTriplesInputTestMapper extends Mapper<LongWritable, TripleWritable, NullWritable, TripleWritable> { + + private static final Logger LOG = Logger.getLogger(RdfTriplesInputTestMapper.class); + + @Override + protected void map(LongWritable key, TripleWritable value, Context context) + throws IOException, InterruptedException { + LOG.info("Line " + key.toString() + " => " + value.toString()); + context.write(NullWritable.get(), value); + } + + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java new file mode 100644 index 0000000..1cda0bd --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +/** + * Abstract tests for blocked triple input formats + * + * + * + */ +public abstract class AbstractBlockedQuadInputFormatTests extends AbstractWholeFileQuadInputFormatTests { + + @Override + protected boolean canSplitInputs() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java new file mode 100644 index 0000000..2e1e865 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +/** + * Abstract tests for blocked triple input formats + * + * + * + */ +public abstract class AbstractBlockedTripleInputFormatTests extends AbstractWholeFileTripleInputFormatTests { + + @Override + protected boolean canSplitInputs() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java new file mode 100644 index 0000000..e22650f --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.jena.hadoop.rdf.io.HadoopIOConstants; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract node tuple input format tests + * + * + * + * @param <TValue> + * @param <T> + */ +public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class); + + protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100; + protected static final String EMPTY = "empty"; + protected static final String SMALL = "small"; + protected static final String LARGE = "large"; + protected static final String BAD = "bad"; + protected static final String MIXED = "mixed"; + + /** + * Temporary folder for the tests + */ + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + protected File empty, small, large, bad, mixed; + + /** + * Prepares the inputs for the tests + * + * @throws IOException + */ + @Before + public void beforeTest() throws IOException { + this.prepareInputs(); + } + + /** + * Cleans up the inputs after each test + */ + @After + public void afterTest() { + // Should be unnecessary since JUnit will clean up the temporary folder + // anyway but best to do this regardless + if (empty != null) + empty.delete(); + if (small != null) + small.delete(); + if (large != null) + large.delete(); + if (bad != null) + bad.delete(); + if (mixed != null) + mixed.delete(); + } + + /** + * Prepares a fresh configuration + * + * @return Configuration + */ + protected Configuration prepareConfiguration() { + Configuration config = new Configuration(true); + // Nothing else to do + return config; + } + + /** + * Prepares the inputs + * + * @throws IOException + */ + protected void prepareInputs() throws IOException { + String ext = this.getFileExtension(); + empty = folder.newFile(EMPTY + ext); + this.generateTuples(empty, EMPTY_SIZE); + small = folder.newFile(SMALL + ext); + this.generateTuples(small, SMALL_SIZE); + large = folder.newFile(LARGE + ext); + this.generateTuples(large, LARGE_SIZE); + bad = folder.newFile(BAD + ext); + this.generateBadTuples(bad, BAD_SIZE); + mixed = folder.newFile(MIXED + ext); + this.generateMixedTuples(mixed, MIXED_SIZE); + } + + /** + * Gets the extra file extension to add to the filenames + * + * @return File extension + */ + protected abstract String getFileExtension(); + + /** + * Generates tuples used for tests + * + * @param f + * File + * @param num + * Number of tuples to generate + * @throws IOException + */ + protected final void generateTuples(File f, int num) throws IOException { + this.generateTuples(this.getOutputStream(f), num); + } + + /** + * Gets the output stream to use for generating tuples + * + * @param f + * File + * @return Output Stream + * @throws IOException + */ + protected OutputStream getOutputStream(File f) throws IOException { + return new FileOutputStream(f, false); + } + + /** + * Generates tuples used for tests + * + * @param output + * Output Stream to write to + * @param num + * Number of tuples to generate + * @throws IOException + */ + protected abstract void generateTuples(OutputStream output, int num) throws IOException; + + /** + * Generates bad tuples used for tests + * + * @param f + * File + * @param num + * Number of bad tuples to generate + * @throws IOException + */ + protected final void generateBadTuples(File f, int num) throws IOException { + this.generateBadTuples(this.getOutputStream(f), num); + } + + /** + * Generates bad tuples used for tests + * + * @param output + * Output Stream to write to + * @param num + * Number of bad tuples to generate + * @throws IOException + */ + protected abstract void generateBadTuples(OutputStream output, int num) throws IOException; + + /** + * Generates a mixture of good and bad tuples used for tests + * + * @param f + * File + * @param num + * Number of tuples to generate, they should be a 50/50 mix of + * good and bad tuples + * @throws IOException + */ + protected final void generateMixedTuples(File f, int num) throws IOException { + this.generateMixedTuples(this.getOutputStream(f), num); + } + + /** + * Generates a mixture of good and bad tuples used for tests + * + * @param output + * Output Stream to write to + * @param num + * Number of tuples to generate, they should be a 50/50 mix of + * good and bad tuples + * @throws IOException + */ + protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException; + + /** + * Adds an input path to the job configuration + * + * @param f + * File + * @param config + * Configuration + * @param job + * Job + * @throws IOException + */ + protected void addInputPath(File f, Configuration config, Job job) throws IOException { + FileSystem fs = FileSystem.getLocal(config); + Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath())); + FileInputFormat.addInputPath(job, inputPath); + } + + protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException { + int count = 0; + + // Check initial progress + LOG.info(String.format("Initial Reported Progress %f", reader.getProgress())); + float progress = reader.getProgress(); + if (Float.compare(0.0f, progress) == 0) { + Assert.assertEquals(0.0d, reader.getProgress(), 0.0d); + } else if (Float.compare(1.0f, progress) == 0) { + // If reader is reported 1.0 straight away then we expect there to + // be no key values + Assert.assertEquals(1.0d, reader.getProgress(), 0.0d); + Assert.assertFalse(reader.nextKeyValue()); + } else { + Assert.fail(String.format( + "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f", + progress)); + } + + // Count tuples + boolean debug = LOG.isDebugEnabled(); + while (reader.nextKeyValue()) { + count++; + progress = reader.getProgress(); + if (debug) + LOG.debug(String.format("Current Reported Progress %f", progress)); + Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress), + progress > 0.0f && progress <= 1.0f); + } + reader.close(); + LOG.info(String.format("Got %d tuples from this record reader", count)); + + // Check final progress + LOG.info(String.format("Final Reported Progress %f", reader.getProgress())); + Assert.assertEquals(1.0d, reader.getProgress(), 0.0d); + + return count; + } + + protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException, + InterruptedException { + Assert.assertEquals(expected, this.countTuples(reader)); + } + + /** + * Runs a test with a single input + * + * @param input + * Input + * @param expectedTuples + * Expected tuples + * @throws IOException + * @throws InterruptedException + */ + protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException, + InterruptedException { + // Prepare configuration + Configuration config = this.prepareConfiguration(); + this.testSingleInput(config, input, expectedSplits, expectedTuples); + } + + /** + * Runs a test with a single input + * + * @param config + * Configuration + * @param input + * Input + * @param expectedTuples + * Expected tuples + * @throws IOException + * @throws InterruptedException + */ + protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples) + throws IOException, InterruptedException { + // Set up fake job + InputFormat<LongWritable, T> inputFormat = this.getInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + this.addInputPath(input, job.getConfiguration(), job); + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length); + NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE); + + // Check splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(expectedSplits, splits.size()); + + // Check tuples + for (InputSplit split : splits) { + TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext); + reader.initialize(split, taskContext); + this.checkTuples(reader, expectedTuples); + } + } + + protected abstract InputFormat<LongWritable, T> getInputFormat(); + + /** + * Basic tuples input test + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException { + testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE); + } + + /** + * Basic tuples input test + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException { + testSingleInput(small, 1, SMALL_SIZE); + } + + /** + * Basic tuples input test + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException { + testSingleInput(large, 1, LARGE_SIZE); + } + + /** + * Basic tuples input test + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException { + testSingleInput(bad, 1, 0); + } + + /** + * Basic tuples input test + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException { + testSingleInput(mixed, 1, MIXED_SIZE / 2); + } + + /** + * Tests behaviour when ignoring bad tuples is disabled + * + * @throws InterruptedException + * @throws IOException + */ + @Test(expected = IOException.class) + public final void fail_on_bad_input_01() throws IOException, InterruptedException { + Configuration config = this.prepareConfiguration(); + config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false); + Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true)); + testSingleInput(config, bad, 1, 0); + } + + /** + * Tests behaviour when ignoring bad tuples is disabled + * + * @throws InterruptedException + * @throws IOException + */ + @Test(expected = IOException.class) + public final void fail_on_bad_input_02() throws IOException, InterruptedException { + Configuration config = this.prepareConfiguration(); + config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false); + Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true)); + testSingleInput(config, mixed, 1, MIXED_SIZE / 2); + } + + /** + * Runs a multiple input test + * + * @param inputs + * Inputs + * @param expectedSplits + * Number of splits expected + * @param expectedTuples + * Number of tuples expected + * @throws IOException + * @throws InterruptedException + */ + protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException, + InterruptedException { + // Prepare configuration and inputs + Configuration config = this.prepareConfiguration(); + + // Set up fake job + InputFormat<LongWritable, T> inputFormat = this.getInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + for (File input : inputs) { + this.addInputPath(input, job.getConfiguration(), job); + } + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length); + NLineInputFormat.setNumLinesPerSplit(job, expectedTuples); + + // Check splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(expectedSplits, splits.size()); + + // Check tuples + int count = 0; + for (InputSplit split : splits) { + TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext); + reader.initialize(split, taskContext); + count += this.countTuples(reader); + } + Assert.assertEquals(expectedTuples, count); + } + + /** + * tuples test with multiple inputs + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException { + testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE + + LARGE_SIZE); + } + + /** + * tuples test with multiple inputs + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException { + testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE + + LARGE_SIZE + (MIXED_SIZE / 2)); + } + + protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples) + throws IOException, InterruptedException { + // Set up fake job + InputFormat<LongWritable, T> inputFormat = this.getInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + for (File input : inputs) { + this.addInputPath(input, job.getConfiguration(), job); + } + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length); + + // Check splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(expectedSplits, splits.size()); + + // Check tuples + int count = 0; + for (InputSplit split : splits) { + // Validate split + Assert.assertTrue(this.isValidSplit(split, config)); + + // Read split + TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext); + reader.initialize(split, taskContext); + count += this.countTuples(reader); + } + Assert.assertEquals(expectedTuples, count); + } + + /** + * Determines whether an input split is valid + * + * @param split + * Input split + * @return True if a valid split, false otherwise + * @throws IOException + */ + protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException { + return split instanceof FileSplit; + } + + /** + * Indicates whether inputs can be split, defaults to true + * + * @return Whether inputs can be split + */ + protected boolean canSplitInputs() { + return true; + } + + /** + * Tests for input splitting + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + @Test + public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException { + Assume.assumeTrue(this.canSplitInputs()); + + Configuration config = this.prepareConfiguration(); + config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false); + Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE)); + this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE); + } + + /** + * Tests for input splitting + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + @Test + public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException { + Assume.assumeTrue(this.canSplitInputs()); + + Configuration config = this.prepareConfiguration(); + config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false); + config.setLong(NLineInputFormat.LINES_PER_MAP, 10); + Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE)); + this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE); + } + + /** + * Tests for input splitting + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + @Test + public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException { + Assume.assumeTrue(this.canSplitInputs()); + + Configuration config = this.prepareConfiguration(); + config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false); + config.setLong(NLineInputFormat.LINES_PER_MAP, 100); + Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE)); + this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java new file mode 100644 index 0000000..78d7f33 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Abstract tests for Quad input formats + * + * + */ +public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> { + + private static final Charset utf8 = Charset.forName("utf-8"); + + @Override + protected void generateTuples(OutputStream output, int num) throws IOException { + for (int i = 0; i < num; i++) { + output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8)); + } + output.flush(); + output.close(); + } + + @Override + protected void generateBadTuples(OutputStream output, int num) throws IOException { + for (int i = 0; i < num; i++) { + output.write("<http://broken\n".getBytes(utf8)); + } + output.flush(); + output.close(); + } + + @Override + protected void generateMixedTuples(OutputStream output, int num) throws IOException { + boolean bad = false; + for (int i = 0; i < num; i++, bad = !bad) { + if (bad) { + output.write("<http://broken\n".getBytes(utf8)); + } else { + output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8)); + } + } + output.flush(); + output.close(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java new file mode 100644 index 0000000..65a9889 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +import com.hp.hpl.jena.graph.Triple; + +/** + * Abstract tests for Triple input formats + * + * + * + */ +public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> { + + private static final Charset utf8 = Charset.forName("utf-8"); + + @Override + protected void generateTuples(OutputStream output, int num) throws IOException { + for (int i = 0; i < num; i++) { + output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8)); + } + output.flush(); + output.close(); + } + + @Override + protected void generateBadTuples(OutputStream output, int num) throws IOException { + byte[] junk = "<http://broken\n".getBytes(utf8); + for (int i = 0; i < num; i++) { + output.write(junk); + } + output.flush(); + output.close(); + } + + @Override + protected void generateMixedTuples(OutputStream output, int num) throws IOException { + boolean bad = false; + for (int i = 0; i < num; i++, bad = !bad) { + if (bad) { + output.write("<http://broken\n".getBytes(utf8)); + } else { + output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8)); + } + } + output.flush(); + output.close(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java new file mode 100644 index 0000000..0b6cfde --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.RDFWriterRegistry; + +import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.DatasetFactory; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.Property; +import com.hp.hpl.jena.rdf.model.Resource; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Abstract tests for Quad input formats + * + * + * + */ +public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> { + + private static final Charset utf8 = Charset.forName("utf-8"); + + @Override + protected boolean canSplitInputs() { + return false; + } + + private void writeTuples(Dataset ds, OutputStream output) { + RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage())); + } + + /** + * Gets the RDF language to write out generate tuples in + * + * @return RDF language + */ + protected abstract Lang getRdfLanguage(); + + private void writeGoodTuples(OutputStream output, int num) throws IOException { + Dataset ds = DatasetFactory.createMem(); + Model m = ModelFactory.createDefaultModel(); + Resource currSubj = m.createResource("http://example.org/subjects/0"); + Property predicate = m.createProperty("http://example.org/predicate"); + for (int i = 0; i < num; i++) { + if (i % 100 == 0) { + ds.addNamedModel("http://example.org/graphs/" + (i / 100), m); + m = ModelFactory.createDefaultModel(); + } + if (i % 10 == 0) { + currSubj = m.createResource("http://example.org/subjects/" + (i / 10)); + } + m.add(currSubj, predicate, m.createTypedLiteral(i)); + } + if (!m.isEmpty()) { + ds.addNamedModel("http://example.org/graphs/extra", m); + } + this.writeTuples(ds, output); + } + + @Override + protected final void generateTuples(OutputStream output, int num) throws IOException { + this.writeGoodTuples(output, num); + output.close(); + } + + @Override + protected final void generateMixedTuples(OutputStream output, int num) throws IOException { + // Write good data + this.writeGoodTuples(output, num / 2); + + // Write junk data + byte[] junk = "junk data\n".getBytes(utf8); + for (int i = 0; i < num / 2; i++) { + output.write(junk); + } + + output.flush(); + output.close(); + } + + @Override + protected final void generateBadTuples(OutputStream output, int num) throws IOException { + byte[] junk = "junk data\n".getBytes(utf8); + for (int i = 0; i < num; i++) { + output.write(junk); + } + output.flush(); + output.close(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java new file mode 100644 index 0000000..b68d662 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; + +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.Property; +import com.hp.hpl.jena.rdf.model.Resource; + +/** + * Abstract tests for Triple input formats + * + * + * + */ +public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> { + + private static final Charset utf8 = Charset.forName("utf-8"); + + @Override + protected boolean canSplitInputs() { + return false; + } + + private void writeTuples(Model m, OutputStream output) { + RDFDataMgr.write(output, m, this.getRdfLanguage()); + } + + /** + * Gets the RDF language to write out generate tuples in + * @return RDF language + */ + protected abstract Lang getRdfLanguage(); + + @Override + protected final void generateTuples(OutputStream output, int num) throws IOException { + Model m = ModelFactory.createDefaultModel(); + Resource currSubj = m.createResource("http://example.org/subjects/0"); + Property predicate = m.createProperty("http://example.org/predicate"); + for (int i = 0; i < num; i++) { + if (i % 10 == 0) { + currSubj = m.createResource("http://example.org/subjects/" + (i / 10)); + } + m.add(currSubj, predicate, m.createTypedLiteral(i)); + } + this.writeTuples(m, output); + output.close(); + } + + @Override + protected final void generateMixedTuples(OutputStream output, int num) throws IOException { + // Write good data + Model m = ModelFactory.createDefaultModel(); + Resource currSubj = m.createResource("http://example.org/subjects/0"); + Property predicate = m.createProperty("http://example.org/predicate"); + for (int i = 0; i < num / 2; i++) { + if (i % 10 == 0) { + currSubj = m.createResource("http://example.org/subjects/" + (i / 10)); + } + m.add(currSubj, predicate, m.createTypedLiteral(i)); + } + this.writeTuples(m, output); + + // Write junk data + byte[] junk = "junk data\n".getBytes(utf8); + for (int i = 0; i < num / 2; i++) { + output.write(junk); + } + + output.flush(); + output.close(); + } + + @Override + protected final void generateBadTuples(OutputStream output, int num) throws IOException { + byte[] junk = "junk data\n".getBytes(utf8); + for (int i = 0; i < num; i++) { + output.write(junk); + } + output.flush(); + output.close(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java new file mode 100644 index 0000000..4bb0939 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.input.bnodes; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.FileAttribute; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.riot.system.ParserProfile; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; + +/** + * Test case that embodies the scenario described in JENA-820 + */ +@SuppressWarnings("unused") +public abstract class AbstractBlankNodeTests<T, TValue extends AbstractNodeTupleWritable<T>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBlankNodeTests.class); + + @BeforeClass + public static void setup() { + // Enable if you need to diagnose test failures + // Useful since it includes printing the file names of the temporary + // files being used + // BasicConfigurator.resetConfiguration(); + // BasicConfigurator.configure(); + } + + /** + * Gets the extension for the initial input files + * + * @return Extension including the {@code .} + */ + protected abstract String getInitialInputExtension(); + + /** + * Creates a tuple + * + * @param s + * Subject + * @param p + * Predicate + * @param o + * Object + * @return Tuple + */ + protected abstract T createTuple(Node s, Node p, Node o); + + /** + * Writes out the given tuples to the given file + * + * @param f + * File + * @param tuples + * Tuples + * @throws FileNotFoundException + */ + protected abstract void writeTuples(File f, List<T> tuples) throws FileNotFoundException; + + /** + * Creates the input format for reading the initial inputs + * + * @return Input format + */ + protected abstract InputFormat<LongWritable, TValue> createInitialInputFormat(); + + /** + * Creates the output format for writing the intermediate output + * + * @return Output format + */ + protected abstract OutputFormat<LongWritable, TValue> createIntermediateOutputFormat(); + + /** + * Creates the input format for reading the intermediate outputs back in + * + * @return Input format + */ + protected abstract InputFormat<LongWritable, TValue> createIntermediateInputFormat(); + + /** + * Gets the subject of the tuple + * + * @param value + * Tuple + * @return Subject + */ + protected abstract Node getSubject(T value); + + /** + * Gets whether the format being tested respects the RIOT + * {@link ParserProfile} + * + * @return True if parser profile is respected, false otherwise + */ + protected boolean respectsParserProfile() { + return true; + } + + /** + * Gets whether the format being tested preserves blank node identity + * + * @return True if identity is presereved, false otherwise + */ + protected boolean preservesBlankNodeIdentity() { + return false; + } + + /** + * Test that starts with two blank nodes with the same identity in a single + * file, splits them over two files and checks that we can workaround + * JENA-820 successfully by setting the + * {@link RdfIOConstants#GLOBAL_BNODE_IDENTITY} flag for our subsequent job + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public final void blank_node_divergence_01() throws IOException, InterruptedException { + Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile()); + Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity()); + + // Temporary files + File a = File.createTempFile("bnode_divergence", getInitialInputExtension()); + File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile(); + + try { + // Prepare the input data + // Two mentions of the same blank node in the same file + List<T> tuples = new ArrayList<>(); + Node bnode = NodeFactory.createAnon(); + Node pred = NodeFactory.createURI("http://example.org/predicate"); + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first"))); + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second"))); + writeTuples(a, tuples); + + // Set up fake job which will process the file as a single split + Configuration config = new Configuration(true); + InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath())); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath())); + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(1, splits.size()); + + for (InputSplit split : splits) { + // Initialize the input reading + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, 1, 1)); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + // Copy the input to the output - each triple goes to a separate + // output file + // This is how we force multiple files to be produced + int taskID = 1; + while (reader.nextKeyValue()) { + // Prepare the output writing + OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat(); + TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, ++taskID, 1)); + RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext); + + writer.write(reader.getCurrentKey(), reader.getCurrentValue()); + writer.close(outputTaskContext); + } + } + + // Promote outputs from temporary status + promoteInputs(intermediateOutputDir); + + // Now we need to create a subsequent job that reads the + // intermediate outputs + // As described in JENA-820 at this point the blank nodes are + // consistent, however when we read them from different files they + // by default get treated as different nodes and so the blank nodes + // diverge which is incorrect and undesirable behaviour in + // multi-stage pipelines + System.out.println(intermediateOutputDir.getAbsolutePath()); + job = Job.getInstance(config); + inputFormat = createIntermediateInputFormat(); + job.setInputFormatClass(inputFormat.getClass()); + FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath())); + + // Enabling this flag works around the JENA-820 issue + job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, true); + context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + splits = inputFormat.getSplits(context); + Assert.assertEquals(2, splits.size()); + + // Expect to end up with a single blank node + Set<Node> nodes = new HashSet<Node>(); + for (InputSplit split : splits) { + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + new TaskAttemptID()); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + while (reader.nextKeyValue()) { + nodes.add(getSubject(reader.getCurrentValue().get())); + } + } + // Nodes should not have diverged + Assert.assertEquals(1, nodes.size()); + + } finally { + a.delete(); + deleteDirectory(intermediateOutputDir); + } + } + + /** + * Test that starts with two blank nodes with the same identity in a single + * file, splits them over two files and shows that they diverge in the + * subsequent job when the JENA-820 workaround is not enabled + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void blank_node_divergence_02() throws IOException, InterruptedException { + Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile()); + Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity()); + + // Temporary files + File a = File.createTempFile("bnode_divergence", getInitialInputExtension()); + File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile(); + + try { + // Prepare the input data + // Two mentions of the same blank node in the same file + List<T> tuples = new ArrayList<>(); + Node bnode = NodeFactory.createAnon(); + Node pred = NodeFactory.createURI("http://example.org/predicate"); + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first"))); + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second"))); + writeTuples(a, tuples); + + // Set up fake job which will process the file as a single split + Configuration config = new Configuration(true); + InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath())); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath())); + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(1, splits.size()); + + for (InputSplit split : splits) { + // Initialize the input reading + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, 1, 1)); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + // Copy the input to the output - each triple goes to a separate + // output file + // This is how we force multiple files to be produced + int taskID = 1; + while (reader.nextKeyValue()) { + // Prepare the output writing + OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat(); + TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, ++taskID, 1)); + RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext); + + writer.write(reader.getCurrentKey(), reader.getCurrentValue()); + writer.close(outputTaskContext); + } + } + + // Promote outputs from temporary status + promoteInputs(intermediateOutputDir); + + // Now we need to create a subsequent job that reads the + // intermediate outputs + // As described in JENA-820 at this point the blank nodes are + // consistent, however when we read them from different files they + // by default get treated as different nodes and so the blank nodes + // diverge which is incorrect and undesirable behaviour in + // multi-stage pipelines. However it is the default behaviour + // because when we start from external inputs we want them to be + // file scoped. + System.out.println(intermediateOutputDir.getAbsolutePath()); + job = Job.getInstance(config); + inputFormat = createIntermediateInputFormat(); + job.setInputFormatClass(inputFormat.getClass()); + FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath())); + + // Make sure JENA-820 flag is disabled + job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false); + context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + splits = inputFormat.getSplits(context); + Assert.assertEquals(2, splits.size()); + + // Expect to end up with a single blank node + Set<Node> nodes = new HashSet<Node>(); + for (InputSplit split : splits) { + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + new TaskAttemptID()); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + while (reader.nextKeyValue()) { + nodes.add(getSubject(reader.getCurrentValue().get())); + } + } + // Nodes should have diverged + Assert.assertEquals(2, nodes.size()); + + } finally { + a.delete(); + deleteDirectory(intermediateOutputDir); + } + } + + /** + * Test that starts with two blank nodes in two different files and checks + * that writing them to a single file does not conflate them + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void blank_node_identity_01() throws IOException, InterruptedException { + Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile()); + Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity()); + + // Temporary files + File a = File.createTempFile("bnode_identity", getInitialInputExtension()); + File b = File.createTempFile("bnode_identity", getInitialInputExtension()); + File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile(); + + try { + // Prepare the input data + // Different blank nodes in different files + List<T> tuples = new ArrayList<>(); + Node bnode1 = NodeFactory.createAnon(); + Node bnode2 = NodeFactory.createAnon(); + Node pred = NodeFactory.createURI("http://example.org/predicate"); + + tuples.add(createTuple(bnode1, pred, NodeFactory.createLiteral("first"))); + writeTuples(a, tuples); + + tuples.clear(); + tuples.add(createTuple(bnode2, pred, NodeFactory.createLiteral("second"))); + writeTuples(b, tuples); + + // Set up fake job which will process the two files + Configuration config = new Configuration(true); + InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath())); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath())); + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(2, splits.size()); + + // Prepare the output writing - putting all output to a single file + OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat(); + TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID( + 1, 2, 1)); + RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext); + + for (InputSplit split : splits) { + // Initialize the input reading + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, 1, 1)); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + // Copy the input to the output - all triples go to a single + // output + while (reader.nextKeyValue()) { + writer.write(reader.getCurrentKey(), reader.getCurrentValue()); + } + } + writer.close(outputTaskContext); + + // Promote outputs from temporary status + promoteInputs(intermediateOutputDir); + + // Now we need to create a subsequent job that reads the + // intermediate outputs + // The Blank nodes should have been given separate identities so we + // should not be conflating them, this is the opposite problem to + // that described in JENA-820 + System.out.println(intermediateOutputDir.getAbsolutePath()); + job = Job.getInstance(config); + inputFormat = createIntermediateInputFormat(); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath())); + context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + splits = inputFormat.getSplits(context); + Assert.assertEquals(1, splits.size()); + + // Expect to end up with a single blank node + Set<Node> nodes = new HashSet<Node>(); + for (InputSplit split : splits) { + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + new TaskAttemptID()); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + while (reader.nextKeyValue()) { + nodes.add(getSubject(reader.getCurrentValue().get())); + } + } + // Nodes must not have converged + Assert.assertEquals(2, nodes.size()); + + } finally { + a.delete(); + b.delete(); + deleteDirectory(intermediateOutputDir); + } + } + + /** + * Test that starts with two blank nodes in two different files and checks + * that writing them to a single file does not conflate them + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void blank_node_identity_02() throws IOException, InterruptedException { + Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile()); + Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity()); + + // Temporary files + File a = File.createTempFile("bnode_identity", getInitialInputExtension()); + File b = File.createTempFile("bnode_identity", getInitialInputExtension()); + File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile(); + + try { + // Prepare the input data + // Same blank node but in different files so must be treated as + // different blank nodes and not converge + List<T> tuples = new ArrayList<>(); + Node bnode = NodeFactory.createAnon(); + Node pred = NodeFactory.createURI("http://example.org/predicate"); + + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first"))); + writeTuples(a, tuples); + + tuples.clear(); + tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second"))); + writeTuples(b, tuples); + + // Set up fake job which will process the two files + Configuration config = new Configuration(true); + InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat(); + Job job = Job.getInstance(config); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath())); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath())); + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + List<InputSplit> splits = inputFormat.getSplits(context); + Assert.assertEquals(2, splits.size()); + + // Prepare the output writing - putting all output to a single file + OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat(); + TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID( + 1, 2, 1)); + RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext); + + for (InputSplit split : splits) { + // Initialize the input reading + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + createAttemptID(1, 1, 1)); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + // Copy the input to the output - all triples go to a single + // output + while (reader.nextKeyValue()) { + writer.write(reader.getCurrentKey(), reader.getCurrentValue()); + } + } + writer.close(outputTaskContext); + + // Promote outputs from temporary status + promoteInputs(intermediateOutputDir); + + // Now we need to create a subsequent job that reads the + // intermediate outputs + // The Blank nodes should have been given separate identities so we + // should not be conflating them, this is the opposite problem to + // that described in JENA-820 + System.out.println(intermediateOutputDir.getAbsolutePath()); + job = Job.getInstance(config); + inputFormat = createIntermediateInputFormat(); + job.setInputFormatClass(inputFormat.getClass()); + NLineInputFormat.setNumLinesPerSplit(job, 100); + FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath())); + context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + + // Get the splits + splits = inputFormat.getSplits(context); + Assert.assertEquals(1, splits.size()); + + // Expect to end up with a single blank node + Set<Node> nodes = new HashSet<Node>(); + for (InputSplit split : splits) { + TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), + new TaskAttemptID()); + RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext); + reader.initialize(split, inputTaskContext); + + while (reader.nextKeyValue()) { + nodes.add(getSubject(reader.getCurrentValue().get())); + } + } + // Nodes must not diverge + Assert.assertEquals(2, nodes.size()); + + } finally { + a.delete(); + b.delete(); + deleteDirectory(intermediateOutputDir); + } + } + + private TaskAttemptID createAttemptID(int jobID, int taskID, int id) { + return new TaskAttemptID("outputTest", jobID, TaskType.MAP, taskID, 1); + } + + private void promoteInputs(File baseDir) throws IOException { + for (File f : baseDir.listFiles()) { + if (f.isDirectory()) { + promoteInputs(baseDir, f); + } + } + } + + private void promoteInputs(File targetDir, File dir) throws IOException { + java.nio.file.Path target = Paths.get(targetDir.toURI()); + for (File f : dir.listFiles()) { + if (f.isDirectory()) { + promoteInputs(targetDir, f); + } else { + LOGGER.debug("Moving {} to {}", f.getAbsolutePath(), target.resolve(f.getName())); + Files.move(Paths.get(f.toURI()), target.resolve(f.getName()), StandardCopyOption.REPLACE_EXISTING); + } + } + + // Remove defunct sub-directory + dir.delete(); + } + + private void deleteDirectory(File dir) throws IOException { + for (File f : dir.listFiles()) { + if (f.isFile()) + f.delete(); + if (f.isDirectory()) + deleteDirectory(f); + } + dir.delete(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java new file mode 100644 index 0000000..bbd6742 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.input.bnodes; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.util.List; + +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFDataMgr; + +import com.hp.hpl.jena.graph.Graph; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.graph.GraphFactory; + +/** + * + */ +public abstract class AbstractTripleBlankNodeTests extends AbstractBlankNodeTests<Triple, TripleWritable> { + + /** + * Gets the language to use + * + * @return Language + */ + protected abstract Lang getLanguage(); + + @Override + protected Triple createTuple(Node s, Node p, Node o) { + return new Triple(s, p, o); + } + + @Override + protected void writeTuples(File f, List<Triple> tuples) throws FileNotFoundException { + Graph g = GraphFactory.createGraphMem(); + for (Triple t : tuples) { + g.add(t); + } + RDFDataMgr.write(new FileOutputStream(f), g, getLanguage()); + } + + @Override + protected Node getSubject(Triple value) { + return value.getSubject(); + } + +}
