http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java
new file mode 100644
index 0000000..31c3a67
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.lang.StreamRDFCounting;
+import org.apache.jena.riot.system.StreamRDFLib;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Abstract node tuple output format tests
+ * 
+ * 
+ * @param <TValue>
+ *            Tuple type
+ * @param <T>
+ *            Writable tuple type
+ * 
+ */
+public abstract class AbstractNodeTupleOutputFormatTests<TValue, T extends 
AbstractNodeTupleWritable<TValue>> {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractNodeTupleOutputFormatTests.class);
+
+    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 
10000, VERY_LARGE_SIZE = 100000;
+
+    /**
+     * Temporary folder for the tests
+     */
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    /**
+     * Prepares a fresh configuration
+     * 
+     * @return Configuration
+     */
+    protected Configuration prepareConfiguration() {
+        Configuration config = new Configuration(true);
+        // Nothing else to do
+        return config;
+    }
+
+    /**
+     * Gets the extra file extension to add to the filenames
+     * 
+     * @return File extension
+     */
+    protected abstract String getFileExtension();
+
+    /**
+     * Generates tuples to be output for testing
+     * 
+     * @param num
+     *            Number of tuples to generate
+     * @return Iterator of tuples
+     */
+    protected abstract Iterator<T> generateTuples(int num);
+
+    /**
+     * Counts tuples in the output file
+     * 
+     * @param f
+     *            Output file
+     * @return Tuple count
+     */
+    protected final long countTuples(File f) {
+        StreamRDFCounting counter = StreamRDFLib.count();
+        RDFDataMgr.parse(counter, f.getAbsolutePath(), this.getRdfLanguage(), 
null);
+        return counter.count();
+    }
+
+    /**
+     * Checks that tuples are as expected
+     * @param f File
+     * @param expected Expected number of tuples
+     */
+    protected void checkTuples(File f, long expected) {
+        Assert.assertEquals(expected, this.countTuples(f));
+    }
+
+    /**
+     * Gets the RDF language of the produced output which is used to parse back
+     * in the output to validate the correct amount of output was produced
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+    /**
+     * Gets the output format to test
+     * 
+     * @return Output format
+     */
+    protected abstract OutputFormat<NullWritable, T> getOutputFormat();
+
+    /**
+     * Adds an output path to the job configuration
+     * 
+     * @param f
+     *            File
+     * @param config
+     *            Configuration
+     * @param job
+     *            Job
+     * @throws IOException
+     */
+    protected void addOutputPath(File f, Configuration config, Job job) throws 
IOException {
+        FileSystem fs = FileSystem.getLocal(config);
+        Path outputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+        FileOutputFormat.setOutputPath(job, outputPath);
+    }
+    
+    protected File findOutputFile(File dir, JobContext context) throws 
FileNotFoundException, IOException {
+        Path outputPath = FileOutputFormat.getOutputPath(context);
+        RemoteIterator<LocatedFileStatus> files = 
outputPath.getFileSystem(context.getConfiguration()).listFiles(outputPath, 
true);
+        while (files.hasNext()) {
+            LocatedFileStatus status = files.next();
+            if (status.isFile() && 
!status.getPath().getName().startsWith("_")) {
+                return new File(status.getPath().toUri());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Tests output
+     * 
+     * @param f
+     *            File to output to
+     * @param num
+     *            Number of tuples to output
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    protected final void testOutput(File f, int num) throws IOException, 
InterruptedException {
+        // Prepare configuration
+        Configuration config = this.prepareConfiguration();
+
+        // Set up fake job
+        OutputFormat<NullWritable, T> outputFormat = this.getOutputFormat();
+        Job job = Job.getInstance(config);
+        job.setOutputFormatClass(outputFormat.getClass());
+        this.addOutputPath(f, job.getConfiguration(), job);
+        JobContext context = new JobContextImpl(job.getConfiguration(), 
job.getJobID());
+        Assert.assertNotNull(FileOutputFormat.getOutputPath(context));
+        
+        // Output the data
+        TaskAttemptID id = new TaskAttemptID("outputTest", 1, TaskType.MAP, 1, 
1);
+        TaskAttemptContext taskContext = new 
TaskAttemptContextImpl(job.getConfiguration(), id);
+        RecordWriter<NullWritable, T> writer = 
outputFormat.getRecordWriter(taskContext);
+        Iterator<T> tuples = this.generateTuples(num);
+        while (tuples.hasNext()) {
+            writer.write(NullWritable.get(), tuples.next());
+        }
+        writer.close(taskContext);
+        
+        // Check output
+        File outputFile = this.findOutputFile(this.folder.getRoot(), context);
+        Assert.assertNotNull(outputFile);
+        this.checkTuples(outputFile, num);
+    }
+
+    /**
+     * Basic output tests
+     * 
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    @Test
+    public final void output_01() throws IOException, InterruptedException {
+        this.testOutput(this.folder.getRoot(), EMPTY_SIZE);
+    }
+    
+    /**
+     * Basic output tests
+     * 
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    @Test
+    public final void output_02() throws IOException, InterruptedException {
+        this.testOutput(this.folder.getRoot(), SMALL_SIZE);
+    }
+    
+    /**
+     * Basic output tests
+     * 
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    @Test
+    public final void output_03() throws IOException, InterruptedException {
+        this.testOutput(this.folder.getRoot(), LARGE_SIZE);
+    }
+    
+    /**
+     * Basic output tests
+     * 
+     * @throws IOException
+     * @throws InterruptedException 
+     */
+    @Test
+    public final void output_04() throws IOException, InterruptedException {
+        this.testOutput(this.folder.getRoot(), VERY_LARGE_SIZE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractQuadOutputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractQuadOutputFormatTests.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractQuadOutputFormatTests.java
new file mode 100644
index 0000000..f1822f6
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractQuadOutputFormatTests.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for quad output formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractQuadOutputFormatTests extends 
AbstractNodeTupleOutputFormatTests<Quad, QuadWritable> {
+
+    @Override
+    protected Iterator<QuadWritable> generateTuples(int num) {
+        List<QuadWritable> qs = new ArrayList<QuadWritable>();
+        for (int i = 0; i < num; i++) {
+            Quad q = new 
Quad(NodeFactory.createURI("http://example.org/graphs/"; + i),
+                    NodeFactory.createURI("http://example.org/subjects/"; + i),
+                    NodeFactory.createURI("http://example.org/predicate";), 
NodeFactory.createLiteral(Integer.toString(i),
+                            XSDDatatype.XSDinteger));
+            qs.add(new QuadWritable(q));
+        }
+        return qs.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractTripleOutputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractTripleOutputFormatTests.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractTripleOutputFormatTests.java
new file mode 100644
index 0000000..90eb531
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractTripleOutputFormatTests.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Abstract tests for triple output formats
+ * 
+ *
+ */
+public abstract class AbstractTripleOutputFormatTests extends 
AbstractNodeTupleOutputFormatTests<Triple, TripleWritable> {
+
+    @Override
+    protected Iterator<TripleWritable> generateTuples(int num) {
+        List<TripleWritable> ts = new ArrayList<TripleWritable>();
+        for (int i = 0; i < num; i++) {
+            Triple t = new 
Triple(NodeFactory.createURI("http://example.org/subjects/"; + i), 
NodeFactory.createURI("http://example.org/predicate";), 
NodeFactory.createLiteral(Integer.toString(i), XSDDatatype.XSDinteger));
+            ts.add(new TripleWritable(t));
+        }
+        return ts.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
new file mode 100644
index 0000000..924cac1
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.BatchedTriGOutputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for TriG output
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class BatchedTriGOutputTest extends AbstractQuadOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { 
$bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public BatchedTriGOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".trig";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TRIG;
+    }
+
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+        return new BatchedTriGOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
new file mode 100644
index 0000000..f0b8490
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class BatchedTurtleOutputTest extends AbstractTripleOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { 
$bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public BatchedTurtleOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".ttl";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TURTLE;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new BatchedTurtleOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NQuadsOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NQuadsOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NQuadsOutputTest.java
new file mode 100644
index 0000000..b228715
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NQuadsOutputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.output.NQuadsOutputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for NQuads output format
+ * 
+ * 
+ * 
+ */
+public class NQuadsOutputTest extends AbstractQuadOutputFormatTests {
+
+    @Override
+    protected String getFileExtension() {
+        return ".nq";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.NQUADS;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+        return new NQuadsOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NTriplesOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NTriplesOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NTriplesOutputTest.java
new file mode 100644
index 0000000..df1ee97
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/NTriplesOutputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.output.NTriplesOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for NTriples output format
+ * 
+ * 
+ * 
+ */
+public class NTriplesOutputTest extends AbstractTripleOutputFormatTests {
+
+    @Override
+    protected String getFileExtension() {
+        return ".nt";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.NTRIPLES;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new NTriplesOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfJsonOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfJsonOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfJsonOutputTest.java
new file mode 100644
index 0000000..50947f2
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfJsonOutputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.output.RdfJsonOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for RDF/JSON output
+ * 
+ * 
+ * 
+ */
+public class RdfJsonOutputTest extends AbstractTripleOutputFormatTests {
+
+    @Override
+    protected String getFileExtension() {
+        return ".rj";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.RDFJSON;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new RdfJsonOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfXmlOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfXmlOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfXmlOutputTest.java
new file mode 100644
index 0000000..128380c
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/RdfXmlOutputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.output.RdfXmlOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for RDF/XML output
+ * 
+ * 
+ * 
+ */
+public class RdfXmlOutputTest extends AbstractTripleOutputFormatTests {
+
+    @Override
+    protected String getFileExtension() {
+        return ".rdf";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.RDFXML;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new RdfXmlOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
new file mode 100644
index 0000000..8a4bd1e
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class StreamedTriGOutputTest extends AbstractQuadOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { 
$bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public StreamedTriGOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".trig";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TRIG;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+        return new TriGOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
new file mode 100644
index 0000000..54d8991
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.TurtleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class StreamedTurtleOutputTest extends AbstractTripleOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { 
$bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public StreamedTurtleOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".ttl";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TURTLE;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new TurtleOutputFormat<NullWritable>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
new file mode 100644
index 0000000..388d96a
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.RDFDataMgr;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ResIterator;
+import com.hp.hpl.jena.rdf.model.Resource;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Tests for TriG output with blank nodes
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class TriGBlankNodeOutputTests extends StreamedTriGOutputTest {
+
+       static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+       static long $bs2 = 1000;
+       static long $bs3 = 100;
+       static long $bs4 = 1;
+
+       /**
+        * @return Test parameters
+        */
+       @Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { 
$bs3 },
+                               { $bs4 } });
+       }
+
+       /**
+        * Creates new tests
+        * 
+        * @param batchSize
+        *            Batch size
+        */
+       public TriGBlankNodeOutputTests(long batchSize) {
+               super(batchSize);
+       }
+
+       @Override
+       protected Iterator<QuadWritable> generateTuples(int num) {
+               List<QuadWritable> qs = new ArrayList<QuadWritable>();
+               Node subject = NodeFactory.createAnon();
+               for (int i = 0; i < num; i++) {
+                       Quad t = new Quad(
+                                       
NodeFactory.createURI("http://example.org/graphs/"; + i),
+                                       subject,
+                                       
NodeFactory.createURI("http://example.org/predicate";),
+                                       
NodeFactory.createLiteral(Integer.toString(i),
+                                                       
XSDDatatype.XSDinteger));
+                       qs.add(new QuadWritable(t));
+               }
+               return qs.iterator();
+       }
+
+       @Override
+       protected void checkTuples(File f, long expected) {
+               super.checkTuples(f, expected);
+
+               Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(),
+                               this.getRdfLanguage());
+               ResIterator iter = m.listSubjects();
+               Set<Node> subjects = new HashSet<Node>();
+               while (iter.hasNext()) {
+                       Resource res = iter.next();
+                       Assert.assertTrue(res.isAnon());
+                       subjects.add(res.asNode());
+               }
+               // Should only be one subject unless the data was empty in 
which case
+               // there will be zero subjects
+               Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size());
+       }
+
+       @Override
+       protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+               return new TriGOutputFormat<NullWritable>();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
new file mode 100644
index 0000000..0d75add
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.TurtleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.RDFDataMgr;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ResIterator;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Tests for Turtle output with blank nodes
+ * 
+ * 
+ * 
+ */
+@RunWith(Parameterized.class)
+public class TurtleBlankNodeOutputTests extends StreamedTurtleOutputTest {
+
+       static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+       static long $bs2 = 1000;
+       static long $bs3 = 100;
+       static long $bs4 = 1;
+
+       /**
+        * @return Test parameters
+        */
+       @Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { 
$bs3 },
+                               { $bs4 } });
+       }
+
+       /**
+        * Creates new tests
+        * 
+        * @param batchSize
+        *            Batch size
+        */
+       public TurtleBlankNodeOutputTests(long batchSize) {
+               super(batchSize);
+       }
+
+       @Override
+       protected Iterator<TripleWritable> generateTuples(int num) {
+               List<TripleWritable> ts = new ArrayList<TripleWritable>();
+               Node subject = NodeFactory.createAnon();
+               for (int i = 0; i < num; i++) {
+                       Triple t = new Triple(subject,
+                                       
NodeFactory.createURI("http://example.org/predicate";),
+                                       
NodeFactory.createLiteral(Integer.toString(i),
+                                                       
XSDDatatype.XSDinteger));
+                       ts.add(new TripleWritable(t));
+               }
+               return ts.iterator();
+       }
+
+       @Override
+       protected void checkTuples(File f, long expected) {
+               super.checkTuples(f, expected);
+
+               Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(),
+                               this.getRdfLanguage());
+               ResIterator iter = m.listSubjects();
+               Set<Node> subjects = new HashSet<Node>();
+               while (iter.hasNext()) {
+                       Resource res = iter.next();
+                       Assert.assertTrue(res.isAnon());
+                       subjects.add(res.asNode());
+               }
+               // Should only be one subject unless the data was empty in 
which case
+               // there will be zero subjects
+               Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size());
+       }
+
+       @Override
+       protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+               return new TurtleOutputFormat<NullWritable>();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-mapreduce/pom.xml 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/pom.xml
new file mode 100644
index 0000000..b907850
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-mapreduce/pom.xml
@@ -0,0 +1,87 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.jena</groupId>
+               <artifactId>jena-hadoop-rdf</artifactId>
+               <version>0.9.0-SNAPSHOT</version>
+       </parent>
+       <artifactId>jena-hadoop-rdf-mapreduce</artifactId>
+       <name>Apache Jena - RDF Tools for Hadoop - Map/Reduce</name>
+       <description>Contains some basic Map/Reduce implementations for working 
with RDF on Hadoop</description>
+
+       <dependencies>
+               <!-- Internal Project Dependencies -->
+               <dependency>
+                       <groupId>org.apache.jena</groupId>
+                       <artifactId>jena-hadoop-rdf-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- Hadoop Dependencies -->
+               <!-- Note these will be provided on the Hadoop cluster hence 
the provided 
+                       scope -->
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-mapreduce-client-common</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Jena dependencies -->
+               <dependency>
+                       <groupId>org.apache.jena</groupId>
+                       <artifactId>jena-arq</artifactId>
+               </dependency>
+
+               <!-- Test Dependencies -->
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.mrunit</groupId>
+                       <artifactId>mrunit</artifactId>
+                       <scope>test</scope>
+                       <classifier>hadoop2</classifier>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- JAR plugin to ensure tests jar is built -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java
new file mode 100644
index 0000000..306a697
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyMapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the value replacing it with the key
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class KeyMapper<TKey, TValue> extends Mapper<TKey, TValue, TKey, TKey> {
+    private static final Logger LOG = LoggerFactory.getLogger(KeyMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Key = {}", key);
+        }
+        context.write(key, key);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java
new file mode 100644
index 0000000..a6e9a6a
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyPlusNullMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the value replacing it with a null
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class KeyPlusNullMapper<TKey, TValue> extends Mapper<TKey, TValue, 
TKey, NullWritable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyPlusNullMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Key = {}", key);
+        }
+        context.write(key, NullWritable.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java
new file mode 100644
index 0000000..7805f16
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/KeyReducer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A reducer that outputs a single pair consists of the key as both fields 
ignoring the values
+ * @author rvesse
+ *
+ * @param <TKey> Key
+ * @param <TValue> Value
+ */
+public class KeyReducer<TKey, TValue> extends Reducer<TKey, TValue, TKey, 
TKey> {
+
+    @Override
+    protected void reduce(TKey key, Iterable<TValue> values, Context context)
+            throws IOException, InterruptedException {
+        context.write(key, key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java
new file mode 100644
index 0000000..7a48c1d
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the value, moves the key to the value position and 
uses a null key
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class NullPlusKeyMapper<TKey, TValue> extends Mapper<TKey, TValue, 
NullWritable, TKey> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NullPlusKeyMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Key = {}", key);
+        }
+        context.write(NullWritable.get(), key);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java
new file mode 100644
index 0000000..dfc6ec1
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusKeyReducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reducer that outputs a single pair consists of a null as the key and the 
key as the value
+ * @author rvesse
+ *
+ * @param <TKey> Key
+ * @param <TValue> Value
+ */
+public class NullPlusKeyReducer<TKey, TValue> extends Reducer<TKey, TValue, 
NullWritable, TKey> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NullPlusKeyReducer.class);
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOGGER.isTraceEnabled();
+    }
+
+    @Override
+    protected void reduce(TKey key, Iterable<TValue> values, Context context)
+            throws IOException, InterruptedException {
+        if (this.tracing) {
+            LOGGER.trace("Input Key = {}", key);
+            Iterator<TValue> iter = values.iterator();
+            while (iter.hasNext()) {
+                LOGGER.trace("Input Value = {}", iter.next());
+            }
+        }
+        context.write(NullWritable.get(), key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java
new file mode 100644
index 0000000..a5ac199
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the key replacing it with a null leaving the value 
as is
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class NullPlusValueMapper<TKey, TValue> extends Mapper<TKey, TValue, 
NullWritable, TValue> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NullPlusValueMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Value = {}", value);
+        }
+        context.write(NullWritable.get(), value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java
new file mode 100644
index 0000000..c6b270f
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/NullPlusValueReducer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reducer that outputs a pair for each value consisting of a null key and 
the
+ * value
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key
+ * @param <TValue>
+ *            Value
+ */
+public class NullPlusValueReducer<TKey, TValue> extends Reducer<TKey, TValue, 
NullWritable, TValue> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NullPlusValueReducer.class);
+    private boolean tracing = false;
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOGGER.isTraceEnabled();
+    }
+
+    @Override
+    protected void reduce(TKey key, Iterable<TValue> values, Context context) 
throws IOException, InterruptedException {
+        if (this.tracing) {
+            LOGGER.trace("Input Key = {}", key);
+        }
+        Iterator<TValue> iter = values.iterator();
+        while (iter.hasNext()) {
+            TValue value = iter.next();
+            if (tracing) {
+                LOGGER.trace("Input Value = {}", value);
+            }
+            context.write(NullWritable.get(), value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java
new file mode 100644
index 0000000..6a8cf18
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/RdfMapReduceConstants.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+/**
+ * RDF Map/Reduce related constants
+ * 
+ * 
+ * 
+ */
+public class RdfMapReduceConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private RdfMapReduceConstants() {
+
+    }
+
+    /**
+     * Configuration key used to set whether the behaviour of the filter 
mappers
+     * is inverted. When enabled the filter mappers will invert their selection
+     * i.e. tuples that would normally be accepted will be rejected and vice
+     * versa.
+     */
+    public static final String FILTER_INVERT = "rdf.mapreduce.filter.invert";
+
+    /**
+     * Configuration key used to set a command separated list of predicate URIs
+     * to filter upon
+     */
+    public static final String FILTER_PREDICATE_URIS = 
"rdf.mapreduce.filter.predicate.uris";
+
+    /**
+     * Configuration key used to set a command separated list of subject URIs 
to
+     * filter upon
+     */
+    public static final String FILTER_SUBJECT_URIS = 
"rdf.mapreduce.filter.subject.uris";
+
+    /**
+     * Configuration key used to set a command separated list of object URIs to
+     * filter upon
+     */
+    public static final String FILTER_OBJECT_URIS = 
"rdf.mapreduce.filter.object.uris";
+
+    /**
+     * Configuration key used to set a command separated list of graph URIs to
+     * filter upon
+     */
+    public static final String FILTER_GRAPH_URIS = 
"rdf.mapreduce.filter.graph.uris";
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java
new file mode 100644
index 0000000..ef518a9
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which swaps the key and value around
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class SwapMapper<TKey, TValue> extends Mapper<TKey, TValue, TValue, 
TKey> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SwapMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Key = {}", key);
+            LOG.trace("Value = {}", value);
+        }
+        context.write(value, key);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java
new file mode 100644
index 0000000..e7e42a0
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/SwapReducer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A reducer that swaps the key and value
+ * @author rvesse
+ *
+ * @param <TKey> Key
+ * @param <TValue> Value
+ */
+public class SwapReducer<TKey, TValue> extends Reducer<TKey, TValue, TValue, 
TKey> {
+
+    @Override
+    protected void reduce(TKey key, Iterable<TValue> values, Context context)
+            throws IOException, InterruptedException {
+        Iterator<TValue> iter = values.iterator();
+        while (iter.hasNext()) {
+            context.write(iter.next(), key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java
new file mode 100644
index 0000000..04b9283
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/TextCountReducer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A reducer which takes text keys with a sequence of longs representing counts
+ * as the values and sums the counts together into pairs consisting of a node
+ * key and a count value.
+ * 
+ * 
+ * 
+ */
+public class TextCountReducer extends Reducer<Text, LongWritable, Text, 
LongWritable> {
+
+    @Override
+    protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException,
+            InterruptedException {
+        long count = 0;
+        Iterator<LongWritable> iter = values.iterator();
+        while (iter.hasNext()) {
+            count += iter.next().get();
+        }
+        context.write(key, new LongWritable(count));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java
new file mode 100644
index 0000000..23ae5f0
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueMapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the key replacing it with the value
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class ValueMapper<TKey, TValue> extends Mapper<TKey, TValue, TValue, 
TValue> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValueMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Value = {}", value);
+        }
+        context.write(value, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java
new file mode 100644
index 0000000..094fb2d
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValuePlusNullMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mapper which discards the key replacing it with the value and nulls out 
the value
+ * 
+ *
+ * @param <TKey> Key type
+ * @param <TValue> Value type
+ */
+public class ValuePlusNullMapper<TKey, TValue> extends Mapper<TKey, TValue, 
TValue, NullWritable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValuePlusNullMapper.class);
+
+    private boolean tracing = false;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        this.tracing = LOG.isTraceEnabled();
+    }
+
+    @Override
+    protected void map(TKey key, TValue value, Context context) throws 
IOException,
+            InterruptedException {
+        if (this.tracing) {
+            LOG.trace("Value = {}", value);
+        }
+        context.write(value, NullWritable.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/92fb810a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java
----------------------------------------------------------------------
diff --git 
a/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java
 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java
new file mode 100644
index 0000000..7d25799
--- /dev/null
+++ 
b/jena-hadoop-rdf/hadoop-rdf-mapreduce/src/main/java/org/apache/jena/hadoop/rdf/mapreduce/ValueReducer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A reducer that outputs a pair for each value consisting of the value as 
both the key and value
+ * @author rvesse
+ *
+ * @param <TKey> Key
+ * @param <TValue> Value
+ */
+public class ValueReducer<TKey, TValue> extends Reducer<TKey, TValue, TValue, 
TValue> {
+
+    @Override
+    protected void reduce(TKey key, Iterable<TValue> values, Context context)
+            throws IOException, InterruptedException {
+        Iterator<TValue> iter = values.iterator();
+        while (iter.hasNext()) {
+            TValue value = iter.next();
+            context.write(value, value);
+        }
+    }
+}

Reply via email to