http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java new file mode 100644 index 0000000..3d9dd00 --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparator; +import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator; +import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter; +import org.apache.jena.riot.thrift.ThriftConvert; +import org.apache.jena.riot.thrift.wire.RDF_Quad; +import org.apache.thrift.TException; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A writable quad + */ +public class QuadWritable extends AbstractNodeTupleWritable<Quad> { + + static { + WritableComparator.define(QuadWritable.class, new SimpleBinaryComparator()); + } + + private RDF_Quad quad = new RDF_Quad(); + + /** + * Creates a new empty instance + */ + public QuadWritable() { + this(null); + } + + /** + * Creates a new instance with the given value + * + * @param q + * Quad + */ + public QuadWritable(Quad q) { + super(q); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static QuadWritable read(DataInput input) throws IOException { + QuadWritable q = new QuadWritable(); + q.readFields(input); + return q; + } + + @Override + public void set(Quad tuple) { + super.set(tuple); + this.quad.clear(); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.quad.clear(); + int tripleLength = input.readInt(); + byte[] buffer = new byte[tripleLength]; + input.readFully(buffer); + try { + ThriftConverter.fromBytes(buffer, this.quad); + } catch (TException e) { + throw new IOException(e); + } + this.setInternal(new Quad(ThriftConvert.convert(this.quad.getG()), ThriftConvert.convert(this.quad.getS()), + ThriftConvert.convert(this.quad.getP()), ThriftConvert.convert(this.quad.getO()))); + } + + @Override + public void write(DataOutput output) throws IOException { + if (this.get() == null) + throw new IOException( + "Null quads cannot be written using this class, consider using NodeTupleWritable instead"); + + // May not have yet prepared the Thrift triple + if (!this.quad.isSetS()) { + Quad tuple = this.get(); + this.quad.setG(ThriftConvert.convert(tuple.getGraph(), false)); + this.quad.setS(ThriftConvert.convert(tuple.getSubject(), false)); + this.quad.setP(ThriftConvert.convert(tuple.getPredicate(), false)); + this.quad.setO(ThriftConvert.convert(tuple.getObject(), false)); + } + + byte[] buffer; + try { + buffer = ThriftConverter.toBytes(this.quad); + } catch (TException e) { + throw new IOException(e); + } + output.writeInt(buffer.length); + output.write(buffer); + } + + @Override + protected Quad createTuple(Node[] ns) { + if (ns.length != 4) + throw new IllegalArgumentException(String.format( + "Incorrect number of nodes to form a quad - got %d but expected 4", ns.length)); + return new Quad(ns[0], ns[1], ns[2], ns[3]); + } + + @Override + protected Node[] createNodes(Quad tuple) { + return new Node[] { tuple.getGraph(), tuple.getSubject(), tuple.getPredicate(), tuple.getObject() }; + } + +}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java new file mode 100644 index 0000000..a17052b --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparator; +import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator; +import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter; +import org.apache.jena.riot.thrift.ThriftConvert; +import org.apache.jena.riot.thrift.wire.RDF_Triple; +import org.apache.thrift.TException; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; + +/** + * A writable triple + * + * + * + */ +public class TripleWritable extends AbstractNodeTupleWritable<Triple> { + + static { + WritableComparator.define(TripleWritable.class, new SimpleBinaryComparator()); + } + + private RDF_Triple triple = new RDF_Triple(); + + /** + * Creates a new instance using the default NTriples node formatter + */ + public TripleWritable() { + this(null); + } + + /** + * Creates a new instance with a given value that uses a specific node + * formatter + * + * @param t + * Triple + */ + public TripleWritable(Triple t) { + super(t); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static TripleWritable read(DataInput input) throws IOException { + TripleWritable t = new TripleWritable(); + t.readFields(input); + return t; + } + + @Override + public void set(Triple tuple) { + super.set(tuple); + this.triple.clear(); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.triple.clear(); + int tripleLength = input.readInt(); + byte[] buffer = new byte[tripleLength]; + input.readFully(buffer); + try { + ThriftConverter.fromBytes(buffer, this.triple); + } catch (TException e) { + throw new IOException(e); + } + this.setInternal(new Triple(ThriftConvert.convert(this.triple.getS()), + ThriftConvert.convert(this.triple.getP()), ThriftConvert.convert(this.triple.getO()))); + } + + @Override + public void write(DataOutput output) throws IOException { + if (this.get() == null) + throw new IOException( + "Null triples cannot be written using this class, consider using NodeTupleWritable instead"); + + // May not have yet prepared the Thrift triple + if (!this.triple.isSetS()) { + Triple tuple = this.get(); + this.triple.setS(ThriftConvert.convert(tuple.getSubject(), false)); + this.triple.setP(ThriftConvert.convert(tuple.getPredicate(), false)); + this.triple.setO(ThriftConvert.convert(tuple.getObject(), false)); + } + + byte[] buffer; + try { + buffer = ThriftConverter.toBytes(this.triple); + } catch (TException e) { + throw new IOException(e); + } + output.writeInt(buffer.length); + output.write(buffer); + } + + @Override + protected Triple createTuple(Node[] ns) { + if (ns.length != 3) + throw new IllegalArgumentException(String.format( + "Incorrect number of nodes to form a triple - got %d but expected 3", ns.length)); + return new Triple(ns[0], ns[1], ns[2]); + } + + @Override + protected Node[] createNodes(Triple tuple) { + return new Node[] { tuple.getSubject(), tuple.getPredicate(), tuple.getObject() }; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java new file mode 100644 index 0000000..6c46714 --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types.comparators; + +import org.apache.hadoop.io.WritableComparator; + +/** + * A general purpose comparator that may be used with any types which can be + * compared directly on their binary encodings + */ +public class SimpleBinaryComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java new file mode 100644 index 0000000..0675afc --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types.converters; + +import java.io.ByteArrayOutputStream; + +import org.apache.jena.riot.thrift.wire.RDF_Quad; +import org.apache.jena.riot.thrift.wire.RDF_Term; +import org.apache.jena.riot.thrift.wire.RDF_Triple; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; + +/** + * Helper for converting between the binary representation of Nodes, Triples and + * Quads and their Jena API equivalents + * + */ +public class ThriftConverter { + + private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>(); + private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>(); + + private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>(); + private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>(); + private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>(); + + private static TMemoryInputTransport getInputTransport() { + TMemoryInputTransport transport = inputTransports.get(); + if (transport != null) + return transport; + + transport = new TMemoryInputTransport(); + inputTransports.set(transport); + return transport; + } + + private static TProtocol getInputProtocol() { + TProtocol protocol = inputProtocols.get(); + if (protocol != null) + return protocol; + + protocol = new TCompactProtocol(getInputTransport()); + inputProtocols.set(protocol); + return protocol; + } + + private static ByteArrayOutputStream getOutputStream() { + ByteArrayOutputStream output = outputStreams.get(); + if (output != null) + return output; + + output = new ByteArrayOutputStream(); + outputStreams.set(output); + return output; + } + + private static TTransport getOutputTransport() { + TTransport transport = outputTransports.get(); + if (transport != null) + return transport; + + transport = new TIOStreamTransport(getOutputStream()); + outputTransports.set(transport); + return transport; + } + + private static TProtocol getOutputProtocol() { + TProtocol protocol = outputProtocols.get(); + if (protocol != null) + return protocol; + + protocol = new TCompactProtocol(getOutputTransport()); + outputProtocols.set(protocol); + return protocol; + } + + public static byte[] toBytes(RDF_Term term) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + term.write(protocol); + + return output.toByteArray(); + } + + public static void fromBytes(byte[] bs, RDF_Term term) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(bs); + TProtocol protocol = getInputProtocol(); + term.read(protocol); + } + + public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(buffer); + TProtocol protocol = getInputProtocol(); + triple.read(protocol); + } + + public static byte[] toBytes(RDF_Triple triple) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + triple.write(protocol); + + return output.toByteArray(); + } + + public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(buffer); + TProtocol protocol = getInputProtocol(); + quad.read(protocol); + } + + public static byte[] toBytes(RDF_Quad quad) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + quad.write(protocol); + + return output.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java new file mode 100644 index 0000000..7214b14 --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; +import org.apache.jena.hadoop.rdf.types.CharacteristicWritable; +import org.junit.Assert; +import org.junit.Test; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; + +/** + * Tests for {@link CharacteristicWritable} and + * {@link CharacteristicSetWritable} + * + * + * + */ +public class CharacteristicTests { + + /** + * Checks whether a writable round trips successfully + * + * @param cw + * Characteristic writable + * @throws IOException + */ + private void checkRoundTrip(CharacteristicWritable cw) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(outputStream); + cw.write(output); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInputStream input = new DataInputStream(inputStream); + CharacteristicWritable actual = CharacteristicWritable.read(input); + Assert.assertEquals(cw, actual); + } + + /** + * Tests characteristic round tripping + * + * @throws IOException + */ + @Test + public void characteristic_writable_01() throws IOException { + Node n = NodeFactory.createURI("http://example.org"); + CharacteristicWritable expected = new CharacteristicWritable(n); + Assert.assertEquals(1, expected.getCount().get()); + + this.checkRoundTrip(expected); + } + + /** + * Tests characteristic properties + * + * @throws IOException + */ + @Test + public void characteristic_writable_02() throws IOException { + Node n = NodeFactory.createURI("http://example.org"); + CharacteristicWritable cw1 = new CharacteristicWritable(n); + CharacteristicWritable cw2 = new CharacteristicWritable(n, 100); + this.checkRoundTrip(cw1); + this.checkRoundTrip(cw2); + + // Should still be equal since equality is only on the node not the + // count + Assert.assertEquals(cw1, cw2); + } + + /** + * Tests characteristic properties + * + * @throws IOException + */ + @Test + public void characteristic_writable_03() throws IOException { + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + this.checkRoundTrip(cw1); + this.checkRoundTrip(cw2); + + // Should not be equal as different nodes + Assert.assertNotEquals(cw1, cw2); + } + + /** + * Checks that a writable round trips + * + * @param set + * Characteristic set + * @throws IOException + */ + private void checkRoundTrip(CharacteristicSetWritable set) throws IOException { + // Test round trip + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(outputStream); + set.write(output); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInputStream input = new DataInputStream(inputStream); + CharacteristicSetWritable actual = CharacteristicSetWritable.read(input); + Assert.assertEquals(set, actual); + } + + /** + * Checks a characteristic set + * + * @param set + * Set + * @param expectedItems + * Expected number of characteristics + * @param expectedCounts + * Expected counts for characteristics + */ + protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) { + Assert.assertEquals(expectedItems, set.size()); + Assert.assertEquals(expectedItems, expectedCounts.length); + Iterator<CharacteristicWritable> iter = set.getCharacteristics(); + int i = 0; + while (iter.hasNext()) { + CharacteristicWritable cw = iter.next(); + Assert.assertEquals(expectedCounts[i], cw.getCount().get()); + i++; + } + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_01() throws IOException { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + set.add(cw1); + set.add(cw2); + this.checkCharacteristicSet(set, 2, new long[] { 1, 1 }); + this.checkRoundTrip(set); + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_02() throws IOException { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2); + set.add(cw1); + set.add(cw2); + this.checkCharacteristicSet(set, 1, new long[] { 3 }); + this.checkRoundTrip(set); + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_03() throws IOException { + CharacteristicSetWritable set1 = new CharacteristicSetWritable(); + CharacteristicSetWritable set2 = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + set1.add(cw1); + set2.add(cw2); + this.checkCharacteristicSet(set1, 1, new long[] { 1 }); + this.checkCharacteristicSet(set2, 1, new long[] { 1 }); + this.checkRoundTrip(set1); + this.checkRoundTrip(set2); + + Assert.assertNotEquals(set1, set2); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java new file mode 100644 index 0000000..a70dfb0 --- /dev/null +++ b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.jena.atlas.lib.Tuple; +import org.apache.jena.hadoop.rdf.types.NodeTupleWritable; +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hp.hpl.jena.datatypes.xsd.XSDDatatype; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Tests for the various RDF types defined by the + * {@link org.apache.jena.hadoop.rdf.types} package + * + * + * + */ +public class RdfTypesTest { + + private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class); + + private ByteArrayOutputStream outputStream; + private ByteArrayInputStream inputStream; + + /** + * Prepare for output + * + * @return Data output + */ + private DataOutput prepareOutput() { + this.outputStream = new ByteArrayOutputStream(); + return new DataOutputStream(this.outputStream); + } + + /** + * Prepare for input from the previously written output + * + * @return Data Input + */ + private DataInput prepareInput() { + this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray()); + return new DataInputStream(this.inputStream); + } + + /** + * Prepare for input from the given data + * + * @param data + * Data + * @return Data Input + */ + @SuppressWarnings("unused") + private DataInput prepareInput(byte[] data) { + this.inputStream = new ByteArrayInputStream(data); + return new DataInputStream(this.inputStream); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException, + ClassNotFoundException { + // Write out data + DataOutput output = this.prepareOutput(); + writable.write(output); + + // Read back in data + DataInput input = this.prepareInput(); + T actual = (T) Class.forName(writable.getClass().getName()).newInstance(); + actual.readFields(input); + + LOG.info("Original = " + writable.toString()); + LOG.info("Round Tripped = " + actual.toString()); + + // Check equivalent + Assert.assertEquals(0, expected.compareTo(actual)); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = null; + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + @Ignore + public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createVariable("x"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + @Ignore + public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createURI("http://example.org"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createURI("http://user:[email protected]/some/path?key=value#id"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("simple"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("language", "en", null); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createAnon(); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createAnon(); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + NodeWritable nw2 = new NodeWritable(n); + testWriteRead(nw2, nw2); + + Assert.assertEquals(0, nw.compareTo(nw2)); + } + + /** + * Basic triple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); + TripleWritable tw = new TripleWritable(t); + testWriteRead(tw, tw); + } + + /** + * Basic triple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); + TripleWritable tw = new TripleWritable(t); + testWriteRead(tw, tw); + } + + /** + * Basic quad writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), + NodeFactory.createLiteral("value")); + QuadWritable qw = new QuadWritable(q); + testWriteRead(qw, qw); + } + + /** + * Basic quad writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), + NodeFactory.createLiteral("value")); + QuadWritable qw = new QuadWritable(q); + testWriteRead(qw, qw); + } + + /** + * Basic tuple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"), + NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three")); + NodeTupleWritable tw = new NodeTupleWritable(t); + testWriteRead(tw, tw); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/pom.xml ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/pom.xml b/jena-elephas/jena-elephas-io/pom.xml new file mode 100644 index 0000000..e5134d0 --- /dev/null +++ b/jena-elephas/jena-elephas-io/pom.xml @@ -0,0 +1,67 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.jena</groupId> + <artifactId>jena-elephas</artifactId> + <version>0.9.0-SNAPSHOT</version> + </parent> + <artifactId>jena-elephas-io</artifactId> + <name>Apache Jena - Elephas - I/O</name> + <description>RDF Input/Output formats library for Hadoop</description> + + <!-- Note that versions are managed by parent POMs --> + <dependencies> + <!-- Internal Project Dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-elephas-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Hadoop Dependencies --> + <!-- Note these will be provided on the Hadoop cluster hence the provided + scope --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Jena dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-arq</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java new file mode 100644 index 0000000..5c1b41c --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io; + +/** + * Hadoop IO related constants + * + * + * + */ +public class HadoopIOConstants { + + /** + * Private constructor prevents instantiation + */ + private HadoopIOConstants() { + } + + /** + * Map Reduce configuration setting for max line length + */ + public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; + + /** + * Run ID + */ + public static final String RUN_ID = "runId"; + + /** + * Compression codecs to use + */ + public static final String IO_COMPRESSION_CODECS = "io.compression.codecs"; +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java new file mode 100644 index 0000000..27c2bb2 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io; + +import java.io.IOException; + +/** + * RDF IO related constants + * + * + * + */ +public class RdfIOConstants { + + /** + * Private constructor prevents instantiation + */ + private RdfIOConstants() { + } + + /** + * Configuration key used to set whether bad tuples are ignored. This is the + * default behaviour, when explicitly set to {@code false} bad tuples will + * result in {@link IOException} being thrown by the relevant record + * readers. + */ + public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples"; + + /** + * Configuration key used to set the batch size used for RDF output formats + * that take a batched writing approach. Default value is given by the + * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}. + */ + public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size"; + + /** + * Default batch size for batched output formats + */ + public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000; + + /** + * Configuration key used to control behaviour with regards to how blank + * nodes are handled. + * <p> + * The default behaviour is that blank nodes are file scoped which is what + * the RDF specifications require. + * </p> + * <p> + * However in the case of a multi-stage pipeline this behaviour can cause + * blank nodes to diverge over several jobs and introduce spurious blank + * nodes over time. This is described in <a + * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and + * enabling this flag for jobs in your pipeline allow you to work around + * this problem. + * </p> + * <h3>Warning</h3> You should only enable this flag for jobs that take in + * RDF output originating from previous jobs since our normal blank node + * allocation policy ensures that blank nodes will be file scoped and unique + * over all files (barring unfortunate hasing collisions). If you enable + * this for jobs that take in RDF originating from other sources you may + * incorrectly conflate blank nodes that are supposed to distinct and + * separate nodes. + */ + public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity"; +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java new file mode 100644 index 0000000..1fcb030 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract line based input format that reuses the machinery from + * {@link NLineInputFormat} to calculate the splits + * + * + * + * @param <TKey> + * Key type + * @param <TValue> + * Value type + */ +public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNLineFileInputFormat.class); + + /** + * Logically splits the set of input files for the job, splits N lines of + * the input as one split. + * + * @see FileInputFormat#getSplits(JobContext) + */ + public final List<InputSplit> getSplits(JobContext job) throws IOException { + boolean debug = LOGGER.isDebugEnabled(); + if (debug && FileInputFormat.getInputDirRecursive(job)) { + LOGGER.debug("Recursive searching for input data is enabled"); + } + + List<InputSplit> splits = new ArrayList<InputSplit>(); + int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job); + for (FileStatus status : listStatus(job)) { + if (debug) { + LOGGER.debug("Determining how to split input file/directory {}", status.getPath()); + } + splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit)); + } + return splits; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java new file mode 100644 index 0000000..e561cdb --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Abstract implementation of a while file input format where each file is a + * single split + * + * + * + * @param <TKey> + * Key type + * @param <TValue> + * Value type + */ +public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { + + @Override + protected final boolean isSplitable(JobContext context, Path filename) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java new file mode 100644 index 0000000..b8fdbd5 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * RDF input format that can handle any RDF quads format that ARQ supports + * selecting the format to use for each file based upon the file extension + * + * + * + */ +public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new QuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java new file mode 100644 index 0000000..03f394a --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +/** + * RDF input format that can handle any RDF triples format that ARQ supports + * selecting the format to use for each file based upon the file extension + */ +public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java new file mode 100644 index 0000000..bfd643e --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * RDF input format that can handle any RDF triple/quads format that ARQ + * supports selecting the format to use for each file based upon the file + * extension. Triples are converted into quads in the default graph. + * + * + * + */ +public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriplesOrQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java new file mode 100644 index 0000000..2464946 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.jsonld; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDQuadReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + +public class JsonLDQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonLDQuadReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java new file mode 100644 index 0000000..0e08a4b --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.jsonld; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDTripleReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +public class JsonLDTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonLDTripleReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java new file mode 100644 index 0000000..6829c4d --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.BlockedNQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NTriples input format where files are processed as blocks of lines rather + * than in a line based manner as with the {@link NQuadsInputFormat} or as + * whole files with the {@link WholeFileNQuadsInputFormat} + * <p> + * This provides a compromise between the higher parser setup of creating more + * parsers and the benefit of being able to split input files over multiple + * mappers. + * </p> + * + * + * + */ +public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new BlockedNQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java new file mode 100644 index 0000000..802fbea --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NQuads input format + * + * + * + */ +public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new NQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java new file mode 100644 index 0000000..128d079 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.nquads; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * NQuads input format where files are processed as complete files rather than + * in a line based manner as with the {@link NQuadsInputFormat} + * <p> + * This has the advantage of less parser setup overhead but the disadvantage + * that the input cannot be split over multiple mappers. + * </p> + * + * + * + */ +public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new WholeFileNQuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java new file mode 100644 index 0000000..292167b --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format where files are processed as blocks of lines rather + * than in a line based manner as with the {@link NTriplesInputFormat} or as + * whole files with the {@link WholeFileNTriplesInputFormat} + * <p> + * This provides a compromise between the higher parser setup of creating more + * parsers and the benefit of being able to split input files over multiple + * mappers. + * </p> + * + * + * + */ +public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new BlockedNTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java new file mode 100644 index 0000000..1694c87 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format + * + * + * + */ +public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + return new NTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java new file mode 100644 index 0000000..31c1252 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.ntriples; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * NTriples input format where files are processed as complete files rather than + * in a line based manner as with the {@link NTriplesInputFormat} + * <p> + * This has the advantage of less parser setup overhead but the disadvantage + * that the input cannot be split over multiple mappers. + * </p> + * + * + * + */ +public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new WholeFileNTriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java new file mode 100644 index 0000000..e5a7940 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.rdfjson; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * RDF/JSON input format + * + * + * + */ +public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new RdfJsonReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java new file mode 100644 index 0000000..4deb925 --- /dev/null +++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input.rdfxml; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat; +import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + + +/** + * RDF/XML input format + * + * + * + */ +public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new RdfXmlReader(); + } + +}
