http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/BatchedTriGOutputTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/BatchedTriGOutputTest.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/BatchedTriGOutputTest.java new file mode 100644 index 0000000..fd886a3 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/BatchedTriGOutputTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.trig; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.output.AbstractQuadOutputFormatTests; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Tests for TriG output + * + * + * + */ +@RunWith(Parameterized.class) +public class BatchedTriGOutputTest extends AbstractQuadOutputFormatTests { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } }); + } + + private final long batchSize; + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public BatchedTriGOutputTest(long batchSize) { + this.batchSize = batchSize; + } + + @Override + protected String getFileExtension() { + return ".trig"; + } + + @Override + protected Lang getRdfLanguage() { + return Lang.TRIG; + } + + @Override + protected Configuration prepareConfiguration() { + Configuration config = super.prepareConfiguration(); + config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize); + return config; + } + + @Override + protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() { + return new BatchedTriGOutputFormat<NullWritable>(); + } + +}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/StreamedTriGOutputTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/StreamedTriGOutputTest.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/StreamedTriGOutputTest.java new file mode 100644 index 0000000..9b2b669 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/StreamedTriGOutputTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.trig; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.output.AbstractQuadOutputFormatTests; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Tests for Turtle output + * + * + * + */ +@RunWith(Parameterized.class) +public class StreamedTriGOutputTest extends AbstractQuadOutputFormatTests { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } }); + } + + private final long batchSize; + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public StreamedTriGOutputTest(long batchSize) { + this.batchSize = batchSize; + } + + @Override + protected String getFileExtension() { + return ".trig"; + } + + @Override + protected Lang getRdfLanguage() { + return Lang.TRIG; + } + + @Override + protected Configuration prepareConfiguration() { + Configuration config = super.prepareConfiguration(); + config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize); + return config; + } + + @Override + protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() { + return new TriGOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/TriGBlankNodeOutputTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/TriGBlankNodeOutputTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/TriGBlankNodeOutputTests.java new file mode 100644 index 0000000..c9b3a26 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trig/TriGBlankNodeOutputTests.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.trig; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.RDFDataMgr; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.hp.hpl.jena.datatypes.xsd.XSDDatatype; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ResIterator; +import com.hp.hpl.jena.rdf.model.Resource; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Tests for TriG output with blank nodes + * + * + * + */ +@RunWith(Parameterized.class) +public class TriGBlankNodeOutputTests extends StreamedTriGOutputTest { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, + { $bs4 } }); + } + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public TriGBlankNodeOutputTests(long batchSize) { + super(batchSize); + } + + @Override + protected Iterator<QuadWritable> generateTuples(int num) { + List<QuadWritable> qs = new ArrayList<QuadWritable>(); + Node subject = NodeFactory.createAnon(); + for (int i = 0; i < num; i++) { + Quad t = new Quad( + NodeFactory.createURI("http://example.org/graphs/" + i), + subject, + NodeFactory.createURI("http://example.org/predicate"), + NodeFactory.createLiteral(Integer.toString(i), + XSDDatatype.XSDinteger)); + qs.add(new QuadWritable(t)); + } + return qs.iterator(); + } + + @Override + protected void checkTuples(File f, long expected) { + super.checkTuples(f, expected); + + Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(), + this.getRdfLanguage()); + ResIterator iter = m.listSubjects(); + Set<Node> subjects = new HashSet<Node>(); + while (iter.hasNext()) { + Resource res = iter.next(); + Assert.assertTrue(res.isAnon()); + subjects.add(res.asNode()); + } + // Should only be one subject unless the data was empty in which case + // there will be zero subjects + Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size()); + } + + @Override + protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() { + return new TriGOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trix/TriXOutputTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trix/TriXOutputTest.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trix/TriXOutputTest.java new file mode 100644 index 0000000..9b6e307 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/trix/TriXOutputTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.trix; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.output.AbstractQuadOutputFormatTests; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.riot.Lang; + +/** + * Tests for TriX output format + */ +public class TriXOutputTest extends AbstractQuadOutputFormatTests { + + @Override + protected String getFileExtension() { + return ".trix"; + } + + @Override + protected Lang getRdfLanguage() { + return Lang.TRIX; + } + + @Override + protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() { + return new TriXOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/BatchedTurtleOutputTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/BatchedTurtleOutputTest.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/BatchedTurtleOutputTest.java new file mode 100644 index 0000000..a6c4d70 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/BatchedTurtleOutputTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.turtle; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.output.AbstractTripleOutputFormatTests; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Tests for Turtle output + * + * + * + */ +@RunWith(Parameterized.class) +public class BatchedTurtleOutputTest extends AbstractTripleOutputFormatTests { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } }); + } + + private final long batchSize; + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public BatchedTurtleOutputTest(long batchSize) { + this.batchSize = batchSize; + } + + @Override + protected String getFileExtension() { + return ".ttl"; + } + + @Override + protected Lang getRdfLanguage() { + return Lang.TURTLE; + } + + @Override + protected Configuration prepareConfiguration() { + Configuration config = super.prepareConfiguration(); + config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize); + return config; + } + + @Override + protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() { + return new BatchedTurtleOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/StreamedTurtleOutputTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/StreamedTurtleOutputTest.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/StreamedTurtleOutputTest.java new file mode 100644 index 0000000..d8843d3 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/StreamedTurtleOutputTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.turtle; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.io.output.AbstractTripleOutputFormatTests; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + + +/** + * Tests for Turtle output + * + * + * + */ +@RunWith(Parameterized.class) +public class StreamedTurtleOutputTest extends AbstractTripleOutputFormatTests { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } }); + } + + private final long batchSize; + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public StreamedTurtleOutputTest(long batchSize) { + this.batchSize = batchSize; + } + + @Override + protected String getFileExtension() { + return ".ttl"; + } + + @Override + protected Lang getRdfLanguage() { + return Lang.TURTLE; + } + + @Override + protected Configuration prepareConfiguration() { + Configuration config = super.prepareConfiguration(); + config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize); + return config; + } + + @Override + protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() { + return new TurtleOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/TurtleBlankNodeOutputTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/TurtleBlankNodeOutputTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/TurtleBlankNodeOutputTests.java new file mode 100644 index 0000000..8dcae4e --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/turtle/TurtleBlankNodeOutputTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.output.turtle; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.jena.hadoop.rdf.io.RdfIOConstants; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.RDFDataMgr; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.hp.hpl.jena.datatypes.xsd.XSDDatatype; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ResIterator; +import com.hp.hpl.jena.rdf.model.Resource; + +/** + * Tests for Turtle output with blank nodes + * + * + * + */ +@RunWith(Parameterized.class) +public class TurtleBlankNodeOutputTests extends StreamedTurtleOutputTest { + + static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE; + static long $bs2 = 1000; + static long $bs3 = 100; + static long $bs4 = 1; + + /** + * @return Test parameters + */ + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, + { $bs4 } }); + } + + /** + * Creates new tests + * + * @param batchSize + * Batch size + */ + public TurtleBlankNodeOutputTests(long batchSize) { + super(batchSize); + } + + @Override + protected Iterator<TripleWritable> generateTuples(int num) { + List<TripleWritable> ts = new ArrayList<TripleWritable>(); + Node subject = NodeFactory.createAnon(); + for (int i = 0; i < num; i++) { + Triple t = new Triple(subject, + NodeFactory.createURI("http://example.org/predicate"), + NodeFactory.createLiteral(Integer.toString(i), + XSDDatatype.XSDinteger)); + ts.add(new TripleWritable(t)); + } + return ts.iterator(); + } + + @Override + protected void checkTuples(File f, long expected) { + super.checkTuples(f, expected); + + Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(), + this.getRdfLanguage()); + ResIterator iter = m.listSubjects(); + Set<Node> subjects = new HashSet<Node>(); + while (iter.hasNext()) { + Resource res = iter.next(); + Assert.assertTrue(res.isAnon()); + subjects.add(res.asNode()); + } + // Should only be one subject unless the data was empty in which case + // there will be zero subjects + Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size()); + } + + @Override + protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() { + return new TurtleOutputFormat<NullWritable>(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/registry/TestHadoopRdfIORegistry.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/registry/TestHadoopRdfIORegistry.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/registry/TestHadoopRdfIORegistry.java new file mode 100644 index 0000000..2eae232 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/registry/TestHadoopRdfIORegistry.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jena.hadoop.rdf.io.registry; + +import java.io.IOException; +import java.io.StringWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the {@link HadoopRdfIORegistry} + */ +public class TestHadoopRdfIORegistry { + + private void testLang(Lang lang, boolean triples, boolean quads, boolean writesSupported) { + Assert.assertEquals(triples, HadoopRdfIORegistry.hasTriplesReader(lang)); + Assert.assertEquals(quads, HadoopRdfIORegistry.hasQuadReader(lang)); + + // Some formats may be asymmetric + if (writesSupported) { + Assert.assertEquals(triples, HadoopRdfIORegistry.hasTriplesWriter(lang)); + Assert.assertEquals(quads, HadoopRdfIORegistry.hasQuadWriter(lang)); + } else { + Assert.assertFalse(HadoopRdfIORegistry.hasTriplesWriter(lang)); + Assert.assertFalse(HadoopRdfIORegistry.hasQuadWriter(lang)); + } + + if (triples) { + // Check that triples are supported + RecordReader<LongWritable, TripleWritable> tripleReader; + try { + tripleReader = HadoopRdfIORegistry.createTripleReader(lang); + Assert.assertNotNull(tripleReader); + } catch (IOException e) { + Assert.fail("Registry indicates that " + lang.getName() + + " can read triples but fails to produce a triple reader when asked: " + e.getMessage()); + } + + if (writesSupported) { + RecordWriter<NullWritable, TripleWritable> tripleWriter; + try { + tripleWriter = HadoopRdfIORegistry.createTripleWriter(lang, new StringWriter(), new Configuration( + false)); + Assert.assertNotNull(tripleWriter); + } catch (IOException e) { + Assert.fail("Registry indicates that " + lang.getName() + + " can write triples but fails to produce a triple writer when asked: " + e.getMessage()); + } + } + } else { + // Check that triples are not supported + try { + HadoopRdfIORegistry.createTripleReader(lang); + Assert.fail("Registry indicates that " + lang.getName() + + " cannot read triples but produced a triple reader when asked (error was expected)"); + } catch (IOException e) { + // This is expected + } + try { + HadoopRdfIORegistry.createTripleWriter(lang, new StringWriter(), new Configuration(false)); + Assert.fail("Registry indicates that " + lang.getName() + + " cannot write triples but produced a triple write when asked (error was expected)"); + } catch (IOException e) { + // This is expected + } + } + + if (quads) { + // Check that quads are supported + RecordReader<LongWritable, QuadWritable> quadReader; + try { + quadReader = HadoopRdfIORegistry.createQuadReader(lang); + Assert.assertNotNull(quadReader); + } catch (IOException e) { + Assert.fail("Registry indicates that " + lang.getName() + + " can read quads but fails to produce a quad reader when asked: " + e.getMessage()); + } + + if (writesSupported) { + RecordWriter<NullWritable, QuadWritable> quadWriter; + try { + quadWriter = HadoopRdfIORegistry.createQuadWriter(lang, new StringWriter(), + new Configuration(false)); + Assert.assertNotNull(quadWriter); + } catch (IOException e) { + Assert.fail("Registry indicates that " + lang.getName() + + " can write quads but fails to produce a triple writer when asked: " + e.getMessage()); + } + } + } else { + try { + HadoopRdfIORegistry.createQuadReader(lang); + Assert.fail("Registry indicates that " + lang.getName() + + " cannot read quads but produced a quad reader when asked (error was expected)"); + } catch (IOException e) { + // This is expected + } + try { + HadoopRdfIORegistry.createQuadWriter(lang, new StringWriter(), new Configuration(false)); + Assert.fail("Registry indicates that " + lang.getName() + + " cannot write quads but produced a quad writer when asked (error was expected)"); + } catch (IOException e) { + // This is expected + } + } + } + + @Test + public void json_ld_registered() { + testLang(Lang.JSONLD, true, true, true); + } + + @Test + public void nquads_registered() { + testLang(Lang.NQUADS, false, true, true); + testLang(Lang.NQ, false, true, true); + } + + @Test + public void ntriples_registered() { + testLang(Lang.NTRIPLES, true, false, true); + testLang(Lang.NT, true, false, true); + } + + @Test + public void rdf_json_registered() { + testLang(Lang.RDFJSON, true, false, true); + } + + @Test + public void rdf_xml_registered() { + testLang(Lang.RDFXML, true, false, true); + } + + @Test + public void rdf_thrift_registered() { + testLang(RDFLanguages.THRIFT, true, true, true); + } + + @Test + public void trig_registered() { + testLang(Lang.TRIG, false, true, true); + } + + @Test + public void trix_registered() { + testLang(Lang.TRIX, false, true, true); + } + + @Test + public void turtle_registered() { + testLang(Lang.TURTLE, true, false, true); + testLang(Lang.TTL, true, false, true); + testLang(Lang.N3, true, false, true); + } + + @Test + public void unregistered() { + testLang(Lang.RDFNULL, false, false, true); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/pom.xml b/jena-elephas/jena-elephas-mapreduce/pom.xml new file mode 100644 index 0000000..aed59be --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/pom.xml @@ -0,0 +1,87 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.jena</groupId> + <artifactId>jena-elephas</artifactId> + <version>0.9.0-SNAPSHOT</version> + </parent> + <artifactId>jena-elephas-mapreduce</artifactId> + <name>Apache Jena - Elephas - Map/Reduce</name> + <description>Contains some basic Map/Reduce implementations for working with RDF on Hadoop</description> + + <dependencies> + <!-- Internal Project Dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-hadoop-rdf-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Hadoop Dependencies --> + <!-- Note these will be provided on the Hadoop cluster hence the provided + scope --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Jena dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-arq</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.mrunit</groupId> + <artifactId>mrunit</artifactId> + <scope>test</scope> + <classifier>hadoop2</classifier> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- JAR plugin to ensure tests jar is built --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java new file mode 100644 index 0000000..306a697 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the value replacing it with the key + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class KeyMapper<TKey, TValue> extends Mapper<TKey, TValue, TKey, TKey> { + private static final Logger LOG = LoggerFactory.getLogger(KeyMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Key = {}", key); + } + context.write(key, key); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java new file mode 100644 index 0000000..a6e9a6a --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the value replacing it with a null + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class KeyPlusNullMapper<TKey, TValue> extends Mapper<TKey, TValue, TKey, NullWritable> { + private static final Logger LOG = LoggerFactory.getLogger(KeyPlusNullMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Key = {}", key); + } + context.write(key, NullWritable.get()); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java new file mode 100644 index 0000000..7805f16 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Reducer; + +/** + * A reducer that outputs a single pair consists of the key as both fields ignoring the values + * @author rvesse + * + * @param <TKey> Key + * @param <TValue> Value + */ +public class KeyReducer<TKey, TValue> extends Reducer<TKey, TValue, TKey, TKey> { + + @Override + protected void reduce(TKey key, Iterable<TValue> values, Context context) + throws IOException, InterruptedException { + context.write(key, key); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java new file mode 100644 index 0000000..7a48c1d --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the value, moves the key to the value position and uses a null key + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class NullPlusKeyMapper<TKey, TValue> extends Mapper<TKey, TValue, NullWritable, TKey> { + private static final Logger LOG = LoggerFactory.getLogger(NullPlusKeyMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Key = {}", key); + } + context.write(NullWritable.get(), key); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java new file mode 100644 index 0000000..dfc6ec1 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reducer that outputs a single pair consists of a null as the key and the key as the value + * @author rvesse + * + * @param <TKey> Key + * @param <TValue> Value + */ +public class NullPlusKeyReducer<TKey, TValue> extends Reducer<TKey, TValue, NullWritable, TKey> { + + private static final Logger LOGGER = LoggerFactory.getLogger(NullPlusKeyReducer.class); + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOGGER.isTraceEnabled(); + } + + @Override + protected void reduce(TKey key, Iterable<TValue> values, Context context) + throws IOException, InterruptedException { + if (this.tracing) { + LOGGER.trace("Input Key = {}", key); + Iterator<TValue> iter = values.iterator(); + while (iter.hasNext()) { + LOGGER.trace("Input Value = {}", iter.next()); + } + } + context.write(NullWritable.get(), key); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java new file mode 100644 index 0000000..a5ac199 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the key replacing it with a null leaving the value as is + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class NullPlusValueMapper<TKey, TValue> extends Mapper<TKey, TValue, NullWritable, TValue> { + private static final Logger LOG = LoggerFactory.getLogger(NullPlusValueMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Value = {}", value); + } + context.write(NullWritable.get(), value); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java new file mode 100644 index 0000000..c6b270f --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reducer that outputs a pair for each value consisting of a null key and the + * value + * + * @author rvesse + * + * @param <TKey> + * Key + * @param <TValue> + * Value + */ +public class NullPlusValueReducer<TKey, TValue> extends Reducer<TKey, TValue, NullWritable, TValue> { + private static final Logger LOGGER = LoggerFactory.getLogger(NullPlusValueReducer.class); + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOGGER.isTraceEnabled(); + } + + @Override + protected void reduce(TKey key, Iterable<TValue> values, Context context) throws IOException, InterruptedException { + if (this.tracing) { + LOGGER.trace("Input Key = {}", key); + } + Iterator<TValue> iter = values.iterator(); + while (iter.hasNext()) { + TValue value = iter.next(); + if (tracing) { + LOGGER.trace("Input Value = {}", value); + } + context.write(NullWritable.get(), value); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java new file mode 100644 index 0000000..6a8cf18 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +/** + * RDF Map/Reduce related constants + * + * + * + */ +public class RdfMapReduceConstants { + + /** + * Private constructor prevents instantiation + */ + private RdfMapReduceConstants() { + + } + + /** + * Configuration key used to set whether the behaviour of the filter mappers + * is inverted. When enabled the filter mappers will invert their selection + * i.e. tuples that would normally be accepted will be rejected and vice + * versa. + */ + public static final String FILTER_INVERT = "rdf.mapreduce.filter.invert"; + + /** + * Configuration key used to set a command separated list of predicate URIs + * to filter upon + */ + public static final String FILTER_PREDICATE_URIS = "rdf.mapreduce.filter.predicate.uris"; + + /** + * Configuration key used to set a command separated list of subject URIs to + * filter upon + */ + public static final String FILTER_SUBJECT_URIS = "rdf.mapreduce.filter.subject.uris"; + + /** + * Configuration key used to set a command separated list of object URIs to + * filter upon + */ + public static final String FILTER_OBJECT_URIS = "rdf.mapreduce.filter.object.uris"; + + /** + * Configuration key used to set a command separated list of graph URIs to + * filter upon + */ + public static final String FILTER_GRAPH_URIS = "rdf.mapreduce.filter.graph.uris"; +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java new file mode 100644 index 0000000..ef518a9 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which swaps the key and value around + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class SwapMapper<TKey, TValue> extends Mapper<TKey, TValue, TValue, TKey> { + private static final Logger LOG = LoggerFactory.getLogger(SwapMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Key = {}", key); + LOG.trace("Value = {}", value); + } + context.write(value, key); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java new file mode 100644 index 0000000..e7e42a0 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.mapreduce.Reducer; + +/** + * A reducer that swaps the key and value + * @author rvesse + * + * @param <TKey> Key + * @param <TValue> Value + */ +public class SwapReducer<TKey, TValue> extends Reducer<TKey, TValue, TValue, TKey> { + + @Override + protected void reduce(TKey key, Iterable<TValue> values, Context context) + throws IOException, InterruptedException { + Iterator<TValue> iter = values.iterator(); + while (iter.hasNext()) { + context.write(iter.next(), key); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java new file mode 100644 index 0000000..04b9283 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * A reducer which takes text keys with a sequence of longs representing counts + * as the values and sums the counts together into pairs consisting of a node + * key and a count value. + * + * + * + */ +public class TextCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, + InterruptedException { + long count = 0; + Iterator<LongWritable> iter = values.iterator(); + while (iter.hasNext()) { + count += iter.next().get(); + } + context.write(key, new LongWritable(count)); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java new file mode 100644 index 0000000..23ae5f0 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the key replacing it with the value + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class ValueMapper<TKey, TValue> extends Mapper<TKey, TValue, TValue, TValue> { + private static final Logger LOG = LoggerFactory.getLogger(ValueMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Value = {}", value); + } + context.write(value, value); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java new file mode 100644 index 0000000..094fb2d --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mapper which discards the key replacing it with the value and nulls out the value + * + * + * @param <TKey> Key type + * @param <TValue> Value type + */ +public class ValuePlusNullMapper<TKey, TValue> extends Mapper<TKey, TValue, TValue, NullWritable> { + private static final Logger LOG = LoggerFactory.getLogger(ValuePlusNullMapper.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void map(TKey key, TValue value, Context context) throws IOException, + InterruptedException { + if (this.tracing) { + LOG.trace("Value = {}", value); + } + context.write(value, NullWritable.get()); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java new file mode 100644 index 0000000..7d25799 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.mapreduce.Reducer; + +/** + * A reducer that outputs a pair for each value consisting of the value as both the key and value + * @author rvesse + * + * @param <TKey> Key + * @param <TValue> Value + */ +public class ValueReducer<TKey, TValue> extends Reducer<TKey, TValue, TValue, TValue> { + + @Override + protected void reduce(TKey key, Iterable<TValue> values, Context context) + throws IOException, InterruptedException { + Iterator<TValue> iter = values.iterator(); + while (iter.hasNext()) { + TValue value = iter.next(); + context.write(value, value); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/AbstractCharacteristicSetGeneratingReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/AbstractCharacteristicSetGeneratingReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/AbstractCharacteristicSetGeneratingReducer.java new file mode 100644 index 0000000..daf61d4 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/AbstractCharacteristicSetGeneratingReducer.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce.characteristics; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable; +import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; +import org.apache.jena.hadoop.rdf.types.CharacteristicWritable; +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Abstract reducer which takes in tuples grouped by some node and generating + * initial characteristic sets. + * <p> + * This produces the characteristic sets as both the key and value so that in a + * subsequent job the characteristic steps may be further combined together to + * total up the usage counts appropriately. + * </p> + * <p> + * It is important to note that the output from this mapper can be very large + * and since it typically needs to be written to HDFS before being processed by + * further jobs it is strongly recommended that you use appropriate output + * compression + * </p> + * + * + * + * @param <TValue> + * Tuple type + * @param <T> + * Writable tuple type + */ +public abstract class AbstractCharacteristicSetGeneratingReducer<TValue, T extends AbstractNodeTupleWritable<TValue>> extends + Reducer<NodeWritable, T, CharacteristicSetWritable, NullWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractCharacteristicSetGeneratingReducer.class); + + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void reduce(NodeWritable key, Iterable<T> values, Context context) throws IOException, InterruptedException { + Map<NodeWritable, CharacteristicWritable> characteristics = new TreeMap<NodeWritable, CharacteristicWritable>(); + + // Firstly need to find individual characteristics + Iterator<T> iter = values.iterator(); + while (iter.hasNext()) { + T tuple = iter.next(); + NodeWritable predicate = this.getPredicate(tuple); + + if (characteristics.containsKey(predicate)) { + characteristics.get(predicate).increment(); + } else { + characteristics.put(predicate, new CharacteristicWritable(predicate.get())); + } + } + + // Then we need to produce all the possible characteristic sets based on + // this information + List<CharacteristicWritable> cs = new ArrayList<CharacteristicWritable>(characteristics.values()); + if (cs.size() == 0) + return; + for (int i = 1; i <= cs.size(); i++) { + this.outputSets(cs, i, context); + } + } + + /** + * Output all sets of a given size + * + * @param cs + * Characteristics + * @param perSet + * Set size + * @param context + * Context to output sets to + * @throws IOException + * @throws InterruptedException + */ + protected void outputSets(List<CharacteristicWritable> cs, int perSet, Context context) throws IOException, + InterruptedException { + if (perSet == 1) { + for (CharacteristicWritable c : cs) { + CharacteristicSetWritable set = new CharacteristicSetWritable(c); + context.write(set, NullWritable.get()); + if (this.tracing) { + LOG.trace("Key = {}", set); + } + } + } else if (perSet == cs.size()) { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + for (CharacteristicWritable c : cs) { + set.add(c); + } + context.write(set, NullWritable.get()); + if (this.tracing) { + LOG.trace("Key = {}", set); + } + } else { + CharacteristicWritable[] members = new CharacteristicWritable[perSet]; + this.combinations(cs, perSet, 0, members, context); + } + } + + /** + * Calculate all available combinations of N elements from the given + * characteristics + * + * @param cs + * Characteristics + * @param len + * Desired number of elements + * @param startPosition + * Start position + * @param result + * Result array to fill + * @param context + * Context to write completed combinations to + * @throws IOException + * @throws InterruptedException + */ + protected final void combinations(List<CharacteristicWritable> cs, int len, int startPosition, + CharacteristicWritable[] result, Context context) throws IOException, InterruptedException { + if (len == 0) { + CharacteristicSetWritable set = new CharacteristicSetWritable(result); + context.write(set, NullWritable.get()); + if (this.tracing) { + LOG.trace("Key = {}", set); + } + return; + } + for (int i = startPosition; i <= cs.size() - len; i++) { + result[result.length - len] = cs.get(i); + combinations(cs, len - 1, i + 1, result, context); + } + } + + /** + * Gets the predicate for the tuple + * + * @param tuple + * Tuple + * @return + */ + protected abstract NodeWritable getPredicate(T tuple); + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/CharacteristicSetReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/CharacteristicSetReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/CharacteristicSetReducer.java new file mode 100644 index 0000000..e70698a --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/CharacteristicSetReducer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce.characteristics; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Reducer which takes in characteristic sets and sums up all their usage counts + * + * + */ +public class CharacteristicSetReducer extends + Reducer<CharacteristicSetWritable, CharacteristicSetWritable, CharacteristicSetWritable, NullWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(CharacteristicSetReducer.class); + private boolean tracing = false; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.tracing = LOG.isTraceEnabled(); + } + + @Override + protected void reduce(CharacteristicSetWritable key, Iterable<CharacteristicSetWritable> values, Context context) + throws IOException, InterruptedException { + Iterator<CharacteristicSetWritable> iter = values.iterator(); + CharacteristicSetWritable output = new CharacteristicSetWritable(0); + + if (this.tracing) { + LOG.trace("Key = {}", key); + } + + while (iter.hasNext()) { + CharacteristicSetWritable set = iter.next(); + if (this.tracing) { + LOG.trace("Value = {}", set); + } + output.add(set); + } + + context.write(output, NullWritable.get()); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/QuadCharacteristicSetGeneratingReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/QuadCharacteristicSetGeneratingReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/QuadCharacteristicSetGeneratingReducer.java new file mode 100644 index 0000000..d11cd56 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/QuadCharacteristicSetGeneratingReducer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce.characteristics; + +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A reducer which converts quads grouped by some node into characteristic sets + * + * + * + */ +public class QuadCharacteristicSetGeneratingReducer extends AbstractCharacteristicSetGeneratingReducer<Quad, QuadWritable> { + + @Override + protected NodeWritable getPredicate(QuadWritable tuple) { + return new NodeWritable(tuple.get().getPredicate()); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/TripleCharacteristicSetGeneratingReducer.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/TripleCharacteristicSetGeneratingReducer.java b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/TripleCharacteristicSetGeneratingReducer.java new file mode 100644 index 0000000..6515c91 --- /dev/null +++ b/jena-elephas/jena-elephas-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/characteristics/TripleCharacteristicSetGeneratingReducer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.mapreduce.characteristics; + +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +import com.hp.hpl.jena.graph.Triple; + +/** + * A reducer which converts triples grouped by some node into characteristic + * sets + * + * + * + */ +public class TripleCharacteristicSetGeneratingReducer extends AbstractCharacteristicSetGeneratingReducer<Triple, TripleWritable> { + + @Override + protected NodeWritable getPredicate(TripleWritable tuple) { + return new NodeWritable(tuple.get().getPredicate()); + } + +}
