http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractNodeWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractNodeWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractNodeWriter.java
new file mode 100644
index 0000000..5d0826d
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractNodeWriter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.atlas.io.AWriter;
+import org.apache.jena.atlas.io.Writer2;
+import org.apache.jena.hadoop.rdf.types.NodeWritable;
+import org.apache.jena.riot.out.NodeFormatter;
+import org.apache.jena.riot.out.NodeFormatterNT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * Abstract implementation of a record writer which writes pairs of nodes and
+ * arbitrary values to text based files
+ * 
+ * 
+ * 
+ * @param <TValue>
+ */
+public abstract class AbstractNodeWriter<TValue> extends 
RecordWriter<NodeWritable, TValue> {
+
+    /**
+     * Default separator written between nodes and their associated values
+     */
+    public static final String DEFAULT_SEPARATOR = "\t";
+
+    private static final Logger log = 
LoggerFactory.getLogger(AbstractNodeWriter.class);
+
+    protected AWriter writer;
+    private NodeFormatter formatter;
+
+    /**
+     * Creates a new tuple writer using the default NTriples node formatter
+     * 
+     * @param writer
+     *            Writer
+     */
+    public AbstractNodeWriter(Writer writer) {
+        this(writer, new NodeFormatterNT());
+    }
+
+    /**
+     * Creates a new tuple writer
+     * 
+     * @param writer
+     *            Writer
+     * @param formatter
+     *            Node formatter
+     */
+    public AbstractNodeWriter(Writer writer, NodeFormatter formatter) {
+        if (writer == null)
+            throw new NullPointerException("writer cannot be null");
+        if (formatter == null)
+            throw new NullPointerException("formatter cannot be null");
+        this.formatter = formatter;
+        this.writer = Writer2.wrap(writer);
+    }
+    
+    @Override
+    public final void write(NodeWritable key, TValue value) throws 
IOException, InterruptedException {
+        this.writeKey(key);
+        this.writer.write(this.getSeparator());
+        this.writeValue(value);
+        this.writer.write('\n');
+    }
+
+    /**
+     * Writes the given key
+     * 
+     * @param key
+     *            Key
+     */
+    protected void writeKey(NodeWritable key) {
+        Node n = key.get();
+        this.getNodeFormatter().format(this.writer, n);
+    }
+
+    /**
+     * Writes the given value
+     * 
+     * @param value
+     */
+    protected void writeValue(TValue value) {
+        if (value instanceof NullWritable)
+            return;
+        this.writer.write(value.toString());
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException {
+        log.debug("close({})", context);
+        writer.close();
+    }
+
+    /**
+     * Gets the node formatter to use for formatting nodes
+     * 
+     * @return Node formatter
+     */
+    protected NodeFormatter getNodeFormatter() {
+        return this.formatter;
+    }
+
+    /**
+     * Gets the separator that is written between nodes
+     * 
+     * @return Separator
+     */
+    protected String getSeparator() {
+        return DEFAULT_SEPARATOR;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
new file mode 100644
index 0000000..025312e
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
@@ -0,0 +1,53 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+public abstract class AbstractStreamRdfNodeTupleWriter<TKey, TTuple, TValue 
extends AbstractNodeTupleWritable<TTuple>>
+               extends RecordWriter<TKey, TValue> {
+
+       private StreamRDF stream;
+       private Writer writer;
+
+       public AbstractStreamRdfNodeTupleWriter(StreamRDF stream, Writer 
writer) {
+               if (stream == null)
+                       throw new NullPointerException("stream cannot be null");
+               if (writer == null)
+                       throw new NullPointerException("writer cannot be null");
+               this.stream = stream;
+               this.stream.start();
+               this.writer = writer;
+       }
+
+       @Override
+       public void close(TaskAttemptContext context) throws IOException,
+                       InterruptedException {
+               this.stream.finish();
+               this.writer.close();
+       }
+
+       @Override
+       public void write(TKey key, TValue value) throws IOException,
+                       InterruptedException {
+               this.sendOutput(key, value, this.stream);
+       }
+
+       /**
+        * Method that handles an actual key value pair passing it to the
+        * {@link StreamRDF} instance as appropriate
+        * 
+        * @param key
+        *            Key
+        * @param value
+        *            Value
+        * @param stream
+        *            RDF Stream
+        */
+       protected abstract void sendOutput(TKey key, TValue value, StreamRDF 
stream);
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileNodeTupleWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileNodeTupleWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileNodeTupleWriter.java
new file mode 100644
index 0000000..d48546b
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileNodeTupleWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An abstract implementation of a record writer that writes records to whole
+ * file formats.
+ * <p>
+ * It is important to note that the writer does not actually write any output
+ * until the {@link #close(TaskAttemptContext)} method is called as it must
+ * write the entire output in one go otherwise the output would be invalid. 
Also
+ * writing in one go increases the chances that the writer will be able to
+ * effectively use the syntax compressions of the RDF serialization being used.
+ * </p>
+ * <p>
+ * The implementation only writes the value portion of the key value pair since
+ * it is the value portion that is used to convey the node tuples
+ * </p>
+ * 
+ * 
+ * 
+ * @param <TKey>
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractWholeFileNodeTupleWriter<TKey, TValue, T extends 
AbstractNodeTupleWritable<TValue>> extends
+        RecordWriter<TKey, T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWholeFileNodeTupleWriter.class);
+
+    private Writer writer;
+
+    protected AbstractWholeFileNodeTupleWriter(Writer writer) {
+        if (writer == null)
+            throw new NullPointerException("writer cannot be null");
+        this.writer = writer;
+    }
+
+    @Override
+    public final void write(TKey key, T value) throws IOException, 
InterruptedException {
+        LOG.debug("write({}={})", key, value);
+        this.add(value);
+    }
+
+    /**
+     * Adds the tuple to the cache of tuples that will be written when the
+     * {@link #close(TaskAttemptContext)} method is called
+     * 
+     * @param value
+     */
+    protected abstract void add(T value);
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, 
InterruptedException {
+        if (this.writer != null) {
+            this.writeOutput(writer);
+            this.writer.close();
+            this.writer = null;
+        }
+    }
+
+    /**
+     * Writes the cached tuples to the writer, the writer should not be closed
+     * by this method implementation
+     * 
+     * @param writer
+     *            Writer
+     */
+    protected abstract void writeOutput(Writer writer);
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileQuadWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileQuadWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileQuadWriter.java
new file mode 100644
index 0000000..5fc0024
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileQuadWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.RDFWriterRegistry;
+
+import com.hp.hpl.jena.sparql.core.DatasetGraph;
+import com.hp.hpl.jena.sparql.core.DatasetGraphFactory;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An abstract record writer for whole file triple formats
+ * 
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public abstract class AbstractWholeFileQuadWriter<TKey> extends 
AbstractWholeFileNodeTupleWriter<TKey, Quad, QuadWritable> {
+
+    private DatasetGraph g = DatasetGraphFactory.createMem();
+
+    protected AbstractWholeFileQuadWriter(Writer writer) {
+        super(writer);
+    }
+
+    @Override
+    protected final void add(QuadWritable value) {
+        this.g.add(value.get());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void writeOutput(Writer writer) {
+        RDFDataMgr.write(writer, this.g, 
RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    }
+
+    /**
+     * Gets the RDF language to write the output in
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileTripleWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileTripleWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileTripleWriter.java
new file mode 100644
index 0000000..bb26093
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractWholeFileTripleWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Graph;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.graph.GraphFactory;
+
+/**
+ * An abstract record writer for whole file triple formats
+ * 
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public abstract class AbstractWholeFileTripleWriter<TKey> extends 
AbstractWholeFileNodeTupleWriter<TKey, Triple, TripleWritable> {
+
+    private Graph g = GraphFactory.createDefaultGraph();
+
+    protected AbstractWholeFileTripleWriter(Writer writer) {
+        super(writer);
+    }
+
+    @Override
+    protected final void add(TripleWritable value) {
+        this.g.add(value.get());
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected final void writeOutput(Writer writer) {
+        RDFDataMgr.write(writer, this.g, this.getRdfLanguage());
+    }
+
+    /**
+     * Gets the RDF language to write the output in
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
new file mode 100644
index 0000000..e567501
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for TriG that uses the batched approach, note that this
+ * approach will produce invalid data when blank nodes span batches
+ *  
+ * @param <TKey>
+ *            Key type
+ */
+public class BatchedTriGWriter<TKey> extends AbstractBatchedQuadWriter<TKey> {
+
+       /**
+        * Creates a new record writer
+        * 
+        * @param writer
+        *            Writer
+        * @param batchSize
+        *            Batch size
+        */
+       public BatchedTriGWriter(Writer writer, long batchSize) {
+               super(writer, batchSize);
+       }
+
+       @Override
+       protected Lang getRdfLanguage() {
+               return Lang.TRIG;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
new file mode 100644
index 0000000..333ee86
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for Turtle that uses the batched approach, note that this
+ * approach will produce invalid data when blank nodes span batches
+ * 
+ * 
+ * 
+ * @param <TKey>
+ */
+public class BatchedTurtleWriter<TKey> extends
+               AbstractBatchedTripleWriter<TKey> {
+
+       /**
+        * Creates a new record writer
+        * 
+        * @param writer
+        *            Writer
+        * @param batchSize
+        *            Batch size
+        */
+       public BatchedTurtleWriter(Writer writer, long batchSize) {
+               super(writer, batchSize);
+       }
+
+       @Override
+       protected Lang getRdfLanguage() {
+               return Lang.TURTLE;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NQuadsWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NQuadsWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NQuadsWriter.java
new file mode 100644
index 0000000..97f7b34
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NQuadsWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.out.CharSpace;
+import org.apache.jena.riot.out.NodeFormatterNT;
+
+/**
+ * A record writer for NQuads
+ * 
+ * 
+ * 
+ * @param <TKey>
+ */
+public class NQuadsWriter<TKey> extends AbstractLineBasedQuadWriter<TKey> {
+
+    /**
+     * Creates a new writer
+     * 
+     * @param writer
+     *            Writer
+     */
+    public NQuadsWriter(Writer writer) {
+        super(writer, new NodeFormatterNT());
+    }
+
+    /**
+     * Creates a new writer using the given character space
+     * 
+     * @param writer
+     *            Writer
+     * @param charSpace
+     *            Character space
+     */
+    public NQuadsWriter(Writer writer, CharSpace charSpace) {
+        super(writer, new NodeFormatterNT(charSpace));
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesNodeWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesNodeWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesNodeWriter.java
new file mode 100644
index 0000000..7ff0dfd
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesNodeWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.out.CharSpace;
+import org.apache.jena.riot.out.NodeFormatterNT;
+
+/**
+ * A NTriples based node writer
+ * 
+ * 
+ * 
+ * @param <TValue>
+ *            Value type
+ */
+public class NTriplesNodeWriter<TValue> extends AbstractNodeWriter<TValue> {
+
+    /**
+     * Creates a new writer
+     * 
+     * @param writer
+     *            Writer
+     */
+    public NTriplesNodeWriter(Writer writer) {
+        super(writer);
+    }
+
+    /**
+     * Creates a new writer
+     * 
+     * @param writer
+     *            Writer
+     * @param charSpace
+     *            Character space to use
+     */
+    public NTriplesNodeWriter(Writer writer, CharSpace charSpace) {
+        super(writer, new NodeFormatterNT(charSpace));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesWriter.java
new file mode 100644
index 0000000..db2bf1f
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/NTriplesWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.out.CharSpace;
+import org.apache.jena.riot.out.NodeFormatterNT;
+
+/**
+ * A record writer for NTriples
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * 
+ */
+public class NTriplesWriter<TKey> extends AbstractLineBasedTripleWriter<TKey> {
+
+    /**
+     * Creates a new writer
+     * 
+     * @param writer
+     *            Writer
+     */
+    public NTriplesWriter(Writer writer) {
+        super(writer, new NodeFormatterNT());
+    }
+
+    /**
+     * Creates a new writer using the given character space
+     * 
+     * @param writer
+     *            Writer
+     * @param charSpace
+     *            Character space
+     */
+    public NTriplesWriter(Writer writer, CharSpace charSpace) {
+        super(writer, new NodeFormatterNT(charSpace));
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfJsonWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfJsonWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfJsonWriter.java
new file mode 100644
index 0000000..e1c6225
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfJsonWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for RDF/JSON
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * 
+ */
+public class RdfJsonWriter<TKey> extends AbstractWholeFileTripleWriter<TKey> {
+
+    /**
+     * Creates a new record writer
+     * 
+     * @param writer
+     *            Writer
+     */
+    public RdfJsonWriter(Writer writer) {
+        super(writer);
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.RDFJSON;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfXmlWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfXmlWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfXmlWriter.java
new file mode 100644
index 0000000..707c3e3
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/RdfXmlWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for RDF/XML
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * 
+ */
+public class RdfXmlWriter<TKey> extends AbstractWholeFileTripleWriter<TKey> {
+
+    /**
+     * Creates a new record writer
+     * 
+     * @param writer
+     *            Writer
+     */
+    public RdfXmlWriter(Writer writer) {
+        super(writer);
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.RDFXML;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
new file mode 100644
index 0000000..62eb926
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
@@ -0,0 +1,27 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A writer for {@link StreamRDF} based quad writers
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class StreamRdfQuadWriter<TKey> extends
+               AbstractStreamRdfNodeTupleWriter<TKey, Quad, QuadWritable> {
+
+       public StreamRdfQuadWriter(StreamRDF stream, Writer writer) {
+               super(stream, writer);
+       }
+
+       @Override
+       protected void sendOutput(TKey key, QuadWritable value, StreamRDF 
stream) {
+               stream.quad(value.get());
+       }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
new file mode 100644
index 0000000..8ba2cf2
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
@@ -0,0 +1,26 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A writer for {@link StreamRDF} based triple writers
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class StreamRdfTripleWriter<TKey> extends 
AbstractStreamRdfNodeTupleWriter<TKey, Triple, TripleWritable> {
+
+       public StreamRdfTripleWriter(StreamRDF stream, Writer writer) {
+               super(stream, writer);
+       }
+
+       @Override
+       protected void sendOutput(TKey key, TripleWritable value, StreamRDF 
stream) {
+               stream.triple(value.get());
+       }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
new file mode 100644
index 0000000..5762fb7
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A test mapper which takes in line based RDF triple input and just produces 
triples
+ * 
+ *
+ */
+public class RdfTriplesInputTestMapper extends Mapper<LongWritable, 
TripleWritable, NullWritable, TripleWritable> {
+    
+    private static final Logger LOG = 
Logger.getLogger(RdfTriplesInputTestMapper.class);
+
+    @Override
+    protected void map(LongWritable key, TripleWritable value, Context context)
+            throws IOException, InterruptedException {
+        LOG.info("Line " + key.toString() + " => " + value.toString());
+        context.write(NullWritable.get(), value);
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
new file mode 100644
index 0000000..1cda0bd
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+/**
+ * Abstract tests for blocked triple input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractBlockedQuadInputFormatTests extends 
AbstractWholeFileQuadInputFormatTests {
+
+    @Override
+    protected boolean canSplitInputs() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
new file mode 100644
index 0000000..2e1e865
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+/**
+ * Abstract tests for blocked triple input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractBlockedTripleInputFormatTests extends 
AbstractWholeFileTripleInputFormatTests {
+
+    @Override
+    protected boolean canSplitInputs() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
new file mode 100644
index 0000000..ef1b8d3
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Abstract node tuple input format tests
+ * 
+ * 
+ * 
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends 
AbstractNodeTupleWritable<TValue>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
+
+    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 
10000, BAD_SIZE = 100, MIXED_SIZE = 100;
+    protected static final String EMPTY = "empty";
+    protected static final String SMALL = "small";
+    protected static final String LARGE = "large";
+    protected static final String BAD = "bad";
+    protected static final String MIXED = "mixed";
+
+    /**
+     * Temporary folder for the tests
+     */
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    protected File empty, small, large, bad, mixed;
+
+    /**
+     * Prepares the inputs for the tests
+     * 
+     * @throws IOException
+     */
+    @Before
+    public void beforeTest() throws IOException {
+        this.prepareInputs();
+    }
+
+    /**
+     * Cleans up the inputs after each test
+     */
+    @After
+    public void afterTest() {
+        // Should be unnecessary since JUnit will clean up the temporary folder
+        // anyway but best to do this regardless
+        empty.delete();
+        small.delete();
+        large.delete();
+        bad.delete();
+        mixed.delete();
+    }
+
+    /**
+     * Prepares a fresh configuration
+     * 
+     * @return Configuration
+     */
+    protected Configuration prepareConfiguration() {
+        Configuration config = new Configuration(true);
+        // Nothing else to do
+        return config;
+    }
+
+    /**
+     * Prepares the inputs
+     * 
+     * @throws IOException
+     */
+    protected void prepareInputs() throws IOException {
+        String ext = this.getFileExtension();
+        empty = folder.newFile(EMPTY + ext);
+        this.generateTuples(empty, EMPTY_SIZE);
+        small = folder.newFile(SMALL + ext);
+        this.generateTuples(small, SMALL_SIZE);
+        large = folder.newFile(LARGE + ext);
+        this.generateTuples(large, LARGE_SIZE);
+        bad = folder.newFile(BAD + ext);
+        this.generateBadTuples(bad, BAD_SIZE);
+        mixed = folder.newFile(MIXED + ext);
+        this.generateMixedTuples(mixed, MIXED_SIZE);
+    }
+
+    /**
+     * Gets the extra file extension to add to the filenames
+     * 
+     * @return File extension
+     */
+    protected abstract String getFileExtension();
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected final void generateTuples(File f, int num) throws IOException {
+        this.generateTuples(this.getWriter(f), num);
+    }
+
+    /**
+     * Gets the writer to use for generating tuples
+     * 
+     * @param f
+     *            File
+     * @return Writer
+     * @throws IOException
+     */
+    protected Writer getWriter(File f) throws IOException {
+        return new FileWriter(f, false);
+    }
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param writer
+     *            Writer to write to
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateTuples(Writer writer, int num) throws 
IOException;
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected final void generateBadTuples(File f, int num) throws IOException 
{
+        this.generateBadTuples(this.getWriter(f), num);
+    }
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param writer
+     *            Writer to write to
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateBadTuples(Writer writer, int num) throws 
IOException;
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected final void generateMixedTuples(File f, int num) throws 
IOException {
+        this.generateMixedTuples(this.getWriter(f), num);
+    }
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param writer
+     *            Writer to write to
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected abstract void generateMixedTuples(Writer write, int num) throws 
IOException;
+
+    /**
+     * Adds an input path to the job configuration
+     * 
+     * @param f
+     *            File
+     * @param config
+     *            Configuration
+     * @param job
+     *            Job
+     * @throws IOException
+     */
+    protected void addInputPath(File f, Configuration config, Job job) throws 
IOException {
+        FileSystem fs = FileSystem.getLocal(config);
+        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+        FileInputFormat.addInputPath(job, inputPath);
+    }
+
+    protected final int countTuples(RecordReader<LongWritable, T> reader) 
throws IOException, InterruptedException {
+        int count = 0;
+
+        // Check initial progress
+        LOG.info(String.format("Initial Reported Progress %f", 
reader.getProgress()));
+        float progress = reader.getProgress();
+        if (Float.compare(0.0f, progress) == 0) {
+            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
+        } else if (Float.compare(1.0f, progress) == 0) {
+            // If reader is reported 1.0 straight away then we expect there to
+            // be no key values
+            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+            Assert.assertFalse(reader.nextKeyValue());
+        } else {
+            Assert.fail(String.format(
+                    "Expected progress of 0.0 or 1.0 before reader has been 
accessed for first time but got %f", progress));
+        }
+
+        // Count tuples
+        boolean debug = LOG.isDebugEnabled();
+        while (reader.nextKeyValue()) {
+            count++;
+            progress = reader.getProgress();
+            if (debug)
+                LOG.debug(String.format("Current Reported Progress %f", 
progress));
+            Assert.assertTrue(String.format("Progress should be in the range 
0.0 < p <= 1.0 but got %f", progress),
+                    progress > 0.0f && progress <= 1.0f);
+        }
+        reader.close();
+        LOG.info(String.format("Got %d tuples from this record reader", 
count));
+
+        // Check final progress
+        LOG.info(String.format("Final Reported Progress %f", 
reader.getProgress()));
+        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+
+        return count;
+    }
+
+    protected final void checkTuples(RecordReader<LongWritable, T> reader, int 
expected) throws IOException, InterruptedException {
+        Assert.assertEquals(expected, this.countTuples(reader));
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(File input, int expectedSplits, int 
expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration
+        Configuration config = this.prepareConfiguration();
+        this.testSingleInput(config, input, expectedSplits, expectedTuples);
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param config
+     *            Configuration
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(Configuration config, File input, int 
expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        this.addInputPath(input, job.getConfiguration(), job);
+        JobContext context = new JobContextImpl(job.getConfiguration(), 
job.getJobID());
+        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new 
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = 
inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            this.checkTuples(reader, expectedTuples);
+        }
+    }
+
+    protected abstract InputFormat<LongWritable, T> getInputFormat();
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_01() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_02() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testSingleInput(small, 1, SMALL_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_03() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testSingleInput(large, 1, LARGE_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_04() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testSingleInput(bad, 1, 0);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_05() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testSingleInput(mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_01() throws IOException, 
InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        
Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, 
true));
+        testSingleInput(config, bad, 1, 0);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_02() throws IOException, 
InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        
Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, 
true));
+        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Runs a multiple input test
+     * 
+     * @param inputs
+     *            Inputs
+     * @param expectedSplits
+     *            Number of splits expected
+     * @param expectedTuples
+     *            Number of tuples expected
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testMultipleInputs(File[] inputs, int expectedSplits, 
int expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration and inputs
+        Configuration config = this.prepareConfiguration();
+
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), 
job.getJobID());
+        Assert.assertEquals(inputs.length, 
FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new 
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = 
inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_01() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { empty, small, large }, 
this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
+                + LARGE_SIZE);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_02() throws IOException, 
InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { folder.getRoot() }, 
this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE
+                + (MIXED_SIZE / 2));
+    }
+
+    protected final void testSplitInputs(Configuration config, File[] inputs, 
int expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), 
job.getJobID());
+        Assert.assertEquals(inputs.length, 
FileInputFormat.getInputPaths(context).length);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            // Validate split
+            Assert.assertTrue(this.isValidSplit(split, config));
+
+            // Read split
+            TaskAttemptContext taskContext = new 
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = 
inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * Determines whether an input split is valid
+     * 
+     * @param split
+     *            Input split
+     * @return True if a valid split, false otherwise
+     * @throws IOException 
+     */
+    protected boolean isValidSplit(InputSplit split, Configuration config) 
throws IOException {
+        return split instanceof FileSplit;
+    }
+
+    /**
+     * Indicates whether inputs can be split, defaults to true
+     * 
+     * @return Whether inputs can be split
+     */
+    protected boolean canSplitInputs() {
+        return true;
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_01() throws IOException, 
InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertEquals(Integer.MAX_VALUE, 
config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_02() throws IOException, 
InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
+        Assert.assertEquals(Integer.MAX_VALUE, 
config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_03() throws IOException, 
InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
+        Assert.assertEquals(Integer.MAX_VALUE, 
config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
new file mode 100644
index 0000000..4dd396b
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for Quad input formats
+ * 
+ *
+ */
+public abstract class AbstractQuadsInputFormatTests extends 
AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+    @Override
+    protected void generateTuples(Writer writer, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("<http://subjects/"; + i + "> <http://predicate> \"" + 
i + "\" <http://graphs/"; + i + "> .\n");
+        }
+        writer.flush();
+        writer.close();
+    }
+    
+    @Override
+    protected void generateBadTuples(Writer writer, int num) throws 
IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("<http://broken\n";);
+        }
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    protected void generateMixedTuples(Writer writer, int num) throws 
IOException {
+        boolean bad = false;
+        for (int i = 0; i < num; i++, bad = !bad) {
+            if (bad) {
+                writer.write("<http://broken\n";);
+            } else {
+                writer.write("<http://subjects/"; + i + "> <http://predicate> 
\"" + i + "\" <http://graphs/"; + i + "> .\n");
+            }
+        }
+        writer.flush();
+        writer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
new file mode 100644
index 0000000..04572d3
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Abstract tests for Triple input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractTriplesInputFormatTests extends 
AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+    @Override
+    protected void generateTuples(Writer writer, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("<http://subjects/"; + i + "> <http://predicate> \"" + 
i + "\" .\n");
+        }
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    protected void generateBadTuples(Writer writer, int num) throws 
IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("<http://broken\n";);
+        }
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    protected void generateMixedTuples(Writer writer, int num) throws 
IOException {
+        boolean bad = false;
+        for (int i = 0; i < num; i++, bad = !bad) {
+            if (bad) {
+                writer.write("<http://broken\n";);
+            } else {
+                writer.write("<http://subjects/"; + i + "> <http://predicate> 
\"" + i + "\" .\n");
+            }
+        }
+        writer.flush();
+        writer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
new file mode 100644
index 0000000..3f2c8d2
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.RDFWriterRegistry;
+
+import com.hp.hpl.jena.query.Dataset;
+import com.hp.hpl.jena.query.DatasetFactory;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for Quad input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractWholeFileQuadInputFormatTests extends 
AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+    @Override
+    protected boolean canSplitInputs() {
+        return false;
+    }
+
+    @SuppressWarnings("deprecation")
+    private void writeTuples(Dataset ds, Writer writer) {
+        RDFDataMgr.write(writer, ds, 
RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    }
+
+    /**
+     * Gets the RDF language to write out generate tuples in
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+    private void writeGoodTuples(Writer writer, int num) throws IOException {
+        Dataset ds = DatasetFactory.createMem();
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0";);
+        Property predicate = m.createProperty("http://example.org/predicate";);
+        for (int i = 0; i < num; i++) {
+            if (i % 100 == 0) {
+                ds.addNamedModel("http://example.org/graphs/"; + (i / 100), m);
+                m = ModelFactory.createDefaultModel();
+            }
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/"; + 
(i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        if (!m.isEmpty()) {
+            ds.addNamedModel("http://example.org/graphs/extra";, m);
+        }
+        this.writeTuples(ds, writer);
+    }
+
+    @Override
+    protected final void generateTuples(Writer writer, int num) throws 
IOException {
+        this.writeGoodTuples(writer, num);
+        writer.close();
+    }
+
+    @Override
+    protected final void generateMixedTuples(Writer writer, int num) throws 
IOException {
+        // Write good data
+        this.writeGoodTuples(writer, num / 2);
+
+        // Write junk data
+        for (int i = 0; i < num / 2; i++) {
+            writer.write("junk data\n");
+        }
+
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    protected final void generateBadTuples(Writer writer, int num) throws 
IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("junk data\n");
+        }
+        writer.flush();
+        writer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
new file mode 100644
index 0000000..bacd7ba
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Abstract tests for Triple input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractWholeFileTripleInputFormatTests extends 
AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+    @Override
+    protected boolean canSplitInputs() {
+        return false;
+    }
+    
+    @SuppressWarnings("deprecation")
+    private void writeTuples(Model m, Writer writer) {
+        RDFDataMgr.write(writer, m, this.getRdfLanguage());
+    }
+    
+    /**
+     * Gets the RDF language to write out generate tuples in
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+    
+    @Override
+    protected final void generateTuples(Writer writer, int num) throws 
IOException {
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0";);
+        Property predicate = m.createProperty("http://example.org/predicate";);
+        for (int i = 0; i < num; i++) {
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/"; + 
(i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        this.writeTuples(m, writer);
+        writer.close();
+    }
+    
+    @Override
+    protected final void generateMixedTuples(Writer writer, int num) throws 
IOException {
+        // Write good data
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0";);
+        Property predicate = m.createProperty("http://example.org/predicate";);
+        for (int i = 0; i < num / 2; i++) {
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/"; + 
(i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        this.writeTuples(m, writer);
+        
+        // Write junk data
+        for (int i = 0; i < num / 2; i++) {
+            writer.write("junk data\n");
+        }
+        
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    protected final void generateBadTuples(Writer writer, int num) throws 
IOException {
+        for (int i = 0; i < num; i++) {
+            writer.write("junk data\n");
+        }
+        writer.flush();
+        writer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputTest.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputTest.java
new file mode 100644
index 0000000..9fe5a1e
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNQuadsInputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.BlockedNQuadsInputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for blocked NTriples input
+ * 
+ * 
+ * 
+ */
+public class BlockedNQuadsInputTest extends 
AbstractBlockedQuadInputFormatTests {
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.NQUADS;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".nq";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, QuadWritable> getInputFormat() {
+        return new BlockedNQuadsInputFormat();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputTest.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputTest.java
new file mode 100644
index 0000000..8b1da66
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/BlockedNTriplesInputTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.BlockedNTriplesInputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for blocked NTriples input
+ * 
+ *
+ */
+public class BlockedNTriplesInputTest extends 
AbstractBlockedTripleInputFormatTests {
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.NTRIPLES;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".nt";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> getInputFormat() {
+        return new BlockedNTriplesInputFormat();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputTest.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputTest.java
new file mode 100644
index 0000000..558db2d
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NQuadsInputTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.NQuadsInputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * Tests for the NQuads input format
+ * 
+ *
+ */
+public class NQuadsInputTest extends AbstractQuadsInputFormatTests {
+
+    @Override
+    protected InputFormat<LongWritable, QuadWritable> getInputFormat() {
+        return new NQuadsInputFormat();
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".nq";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputTest.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputTest.java
new file mode 100644
index 0000000..0fe4077
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/NTriplesInputTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.NTriplesInputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * Tests for the {@link NTriplesInputFormat}
+ * 
+ * 
+ * 
+ */
+public class NTriplesInputTest extends AbstractTriplesInputFormatTests {
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> getInputFormat() {
+        return new NTriplesInputFormat();
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".nt";
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/05c389be/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputTest.java
 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputTest.java
new file mode 100644
index 0000000..71c73a9
--- /dev/null
+++ 
b/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/RdfJsonInputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.RdfJsonInputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Tests for RDF/JSON input
+ * 
+ * 
+ * 
+ */
+public class RdfJsonInputTest extends AbstractWholeFileTripleInputFormatTests {
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.RDFJSON;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".rj";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> getInputFormat() {
+        return new RdfJsonInputFormat();
+    }
+
+}

Reply via email to