http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java new file mode 100644 index 0000000..f29b156 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; + +/** + * Represents a characteristic set which is comprised of a count of nodes for + * which the characteristic is applicable and a set of characteristics which + * represents the number of usages of predicates with those nodes + * + * + * + */ +public class CharacteristicSetWritable implements WritableComparable<CharacteristicSetWritable> { + + private Map<NodeWritable, CharacteristicWritable> characteristics = new TreeMap<NodeWritable, CharacteristicWritable>(); + private LongWritable count = new LongWritable(); + + /** + * Creates a new empty characteristic set with the default count of 1 + */ + public CharacteristicSetWritable() { + this(1); + } + + /** + * Creates a new characteristic set with the default count of 1 and the + * given characteristics + * + * @param characteristics + * Characteristics + */ + public CharacteristicSetWritable(CharacteristicWritable... characteristics) { + this(1, characteristics); + } + + /** + * Creates an empty characteristic set with the given count + * + * @param count + * Count + */ + public CharacteristicSetWritable(long count) { + this(count, new CharacteristicWritable[0]); + } + + /** + * Creates a new characteristic set + * + * @param count + * Count + * @param characteristics + * Characteristics + */ + public CharacteristicSetWritable(long count, CharacteristicWritable... characteristics) { + this.count.set(count); + for (CharacteristicWritable characteristic : characteristics) { + this.characteristics.put(characteristic.getNode(), characteristic); + } + } + + /** + * Creates a new instance and reads its data from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static CharacteristicSetWritable read(DataInput input) throws IOException { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + set.readFields(input); + return set; + } + + /** + * Gets the count + * + * @return Count + */ + public LongWritable getCount() { + return this.count; + } + + /** + * Gets the characteristics + * + * @return Characteristics + */ + public Iterator<CharacteristicWritable> getCharacteristics() { + return this.characteristics.values().iterator(); + } + + /** + * Gets the size of the characteristic set + * + * @return Size + */ + public int size() { + return this.characteristics.size(); + } + + /** + * Adds a characteristic to the set merging it into the appropriate existing + * characteristic if applicable + * + * @param characteristic + * Characteristics + */ + public void add(CharacteristicWritable characteristic) { + if (this.characteristics.containsKey(characteristic.getNode())) { + this.characteristics.get(characteristic.getNode()).increment(characteristic.getCount().get()); + } else { + this.characteristics.put(characteristic.getNode(), characteristic); + } + } + + /** + * Adds some characteristics to the set merging them with the appropriate + * existing characteristics if applicable + * + * @param characteristics + */ + public void add(CharacteristicWritable... characteristics) { + for (CharacteristicWritable characteristic : characteristics) { + this.add(characteristic); + } + } + + /** + * Adds the contents of the other characteristic set to this characteristic + * set + * + * @param set + * Characteristic set + */ + public void add(CharacteristicSetWritable set) { + this.increment(set.getCount().get()); + Iterator<CharacteristicWritable> iter = set.getCharacteristics(); + while (iter.hasNext()) { + this.add(iter.next()); + } + } + + /** + * Gets whether the set contains a characteristic for the given predicate + * + * @param uri + * Predicate URI + * @return True if contained in the set, false otherwise + */ + public boolean hasCharacteristic(String uri) { + return this.hasCharacteristic(NodeFactory.createURI(uri)); + } + + /** + * Gets whether the set contains a characteristic for the given predicate + * + * @param n + * Predicate + * @return True if contained in the set, false otherwise + */ + public boolean hasCharacteristic(Node n) { + return this.hasCharacteristic(new NodeWritable(n)); + } + + /** + * Gets whether the set contains a characteristic for the given predicate + * + * @param n + * Predicate + * @return True if contained in the set, false otherwise + */ + public boolean hasCharacteristic(NodeWritable n) { + return this.characteristics.containsKey(n); + } + + /** + * Increments the count by the given increment + * + * @param l + * Increment + */ + public void increment(long l) { + this.count.set(this.count.get() + l); + } + + @Override + public void write(DataOutput output) throws IOException { + // Write size, then count, then characteristics + WritableUtils.writeVInt(output, this.characteristics.size()); + this.count.write(output); + for (CharacteristicWritable characteristic : this.characteristics.values()) { + characteristic.write(output); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + // Read size, then count, then characteristics + int size = WritableUtils.readVInt(input); + this.count.readFields(input); + this.characteristics.clear(); + for (int i = 0; i < size; i++) { + CharacteristicWritable cw = CharacteristicWritable.read(input); + this.characteristics.put(cw.getNode(), cw); + } + } + + @Override + public int compareTo(CharacteristicSetWritable cs) { + int size = this.characteristics.size(); + int otherSize = cs.characteristics.size(); + if (size < otherSize) { + return -1; + } else if (size > otherSize) { + return 1; + } else { + // Compare characteristics in turn + Iterator<CharacteristicWritable> iter = this.getCharacteristics(); + Iterator<CharacteristicWritable> otherIter = cs.getCharacteristics(); + + int compare = 0; + while (iter.hasNext()) { + CharacteristicWritable c = iter.next(); + CharacteristicWritable otherC = otherIter.next(); + compare = c.compareTo(otherC); + if (compare != 0) + return compare; + } + return compare; + } + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof CharacteristicSetWritable)) + return false; + return this.compareTo((CharacteristicSetWritable) other) == 0; + } + + @Override + public int hashCode() { + // Build a hash code from characteristics + if (this.characteristics.size() == 0) + return 0; + Iterator<CharacteristicWritable> iter = this.getCharacteristics(); + int hash = 17; + while (iter.hasNext()) { + hash = hash * 31 + iter.next().hashCode(); + } + return hash; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("{ "); + builder.append(this.count.get()); + Iterator<CharacteristicWritable> iter = this.getCharacteristics(); + while (iter.hasNext()) { + builder.append(" , "); + builder.append(iter.next().toString()); + } + builder.append(" }"); + return builder.toString(); + } + +}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java new file mode 100644 index 0000000..90fc7db --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +import com.hp.hpl.jena.graph.Node; + +/** + * Represents a characteristic for a single node and contains the node and a + * count associated with that node + * <p> + * Note that characteristics are compared based upon only the nodes and not + * their counts + * </p> + * + * + * + */ +public class CharacteristicWritable implements WritableComparable<CharacteristicWritable> { + + private NodeWritable node = new NodeWritable(); + private LongWritable count = new LongWritable(); + + /** + * Creates an empty characteristic writable + */ + public CharacteristicWritable() { + this(null); + } + + /** + * Creates a characteristic writable with the given node and the default + * count of 1 + * + * @param n + * Node + */ + public CharacteristicWritable(Node n) { + this(n, 1); + } + + /** + * Creates a characteristic writable with the given node and count + * + * @param n + * Node + * @param count + * Count + */ + public CharacteristicWritable(Node n, long count) { + this.node.set(n); + this.count.set(count); + } + + /** + * Creates a new instance and reads in its data from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static CharacteristicWritable read(DataInput input) throws IOException { + CharacteristicWritable cw = new CharacteristicWritable(); + cw.readFields(input); + return cw; + } + + /** + * Gets the node + * + * @return Node + */ + public NodeWritable getNode() { + return this.node; + } + + /** + * Gets the count + * + * @return Count + */ + public LongWritable getCount() { + return this.count; + } + + /** + * Increments the count by 1 + */ + public void increment() { + this.increment(1); + } + + /** + * Increments the count by the given value + * + * @param l + * Value to increment by + */ + public void increment(long l) { + this.count.set(this.count.get() + l); + } + + @Override + public void write(DataOutput output) throws IOException { + this.node.write(output); + this.count.write(output); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.node.readFields(input); + this.count.readFields(input); + } + + @Override + public int compareTo(CharacteristicWritable o) { + return this.node.compareTo(o.node); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof CharacteristicWritable)) + return false; + return this.compareTo((CharacteristicWritable) other) == 0; + } + + @Override + public int hashCode() { + return this.node.hashCode(); + } + + @Override + public String toString() { + return "(" + this.node.toString() + ", " + this.count.toString() + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java new file mode 100644 index 0000000..e06aac4 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.jena.atlas.lib.Tuple; +import com.hp.hpl.jena.graph.Node; + +/** + * A writable RDF tuple + * <p> + * Unlike the more specific {@link TripleWritable} and {@link QuadWritable} this + * class allows for arbitrary length tuples and does not restrict tuples to + * being of uniform size. + * </p> + * + * + * + */ +public class NodeTupleWritable extends AbstractNodeTupleWritable<Tuple<Node>> { + + /** + * Creates a new empty instance + */ + public NodeTupleWritable() { + this(null); + } + + /** + * Creates a new instance with the given value + * + * @param tuple + * Tuple + */ + public NodeTupleWritable(Tuple<Node> tuple) { + super(tuple); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static NodeTupleWritable read(DataInput input) throws IOException { + NodeTupleWritable t = new NodeTupleWritable(); + t.readFields(input); + return t; + } + + @Override + protected Tuple<Node> createTuple(Node[] ns) { + return Tuple.create(ns); + } + + @Override + protected Node[] createNodes(Tuple<Node> tuple) { + return tuple.tuple(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java new file mode 100644 index 0000000..cf00f8d --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator; +import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter; +import org.apache.jena.riot.thrift.TRDF; +import org.apache.jena.riot.thrift.ThriftConvert; +import org.apache.jena.riot.thrift.wire.RDF_Term; +import org.apache.thrift.TException; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.sparql.util.NodeUtils; + +/** + * A writable for {@link Node} instances + * <p> + * This uses <a + * href="http://afs.github.io/rdf-thrift/rdf-binary-thrift.html">RDF Thrift</a> + * for the binary encoding of terms. The in-memory storage for this type is both + * a {@link Node} and a {@link RDF_Term} with lazy conversion between the two + * forms as necessary. + * </p> + */ +public class NodeWritable implements WritableComparable<NodeWritable> { + + static { + WritableComparator.define(NodeWritable.class, new SimpleBinaryComparator()); + } + + private Node node; + private RDF_Term term = new RDF_Term(); + + /** + * Creates an empty writable + */ + public NodeWritable() { + this(null); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static NodeWritable read(DataInput input) throws IOException { + NodeWritable nw = new NodeWritable(); + nw.readFields(input); + return nw; + } + + /** + * Creates a new writable with the given value + * + * @param n + * Node + */ + public NodeWritable(Node n) { + this.set(n); + } + + /** + * Gets the node + * + * @return Node + */ + public Node get() { + // We may not have yet loaded the node + if (this.node == null) { + // If term is set to undefined then node is supposed to be null + if (this.term.isSet() && !this.term.isSetUndefined()) { + this.node = ThriftConvert.convert(this.term); + } + } + return this.node; + } + + /** + * Sets the node + * + * @param n + * Node + */ + public void set(Node n) { + this.node = n; + // Clear the term for now + // We only convert the Node to a term as and when we want to write it + // out in order to not waste effort if the value is never written out + this.term.clear(); + } + + @Override + public void readFields(DataInput input) throws IOException { + // Clear previous value + this.node = null; + this.term.clear(); + + // Read in the new value + int termLength = input.readInt(); + byte[] buffer = new byte[termLength]; + input.readFully(buffer); + try { + ThriftConverter.fromBytes(buffer, this.term); + } catch (TException e) { + throw new IOException(e); + } + + // Note that we don't convert it back into a Node at this time + } + + @Override + public void write(DataOutput output) throws IOException { + // May not yet have prepared the Thrift term + if (!this.term.isSet()) { + if (this.node == null) { + this.term.setUndefined(TRDF.UNDEF); + } else { + ThriftConvert.toThrift(this.node, null, this.term, false); + } + } + + // Write out the Thrift term + byte[] buffer; + try { + buffer = ThriftConverter.toBytes(this.term); + } catch (TException e) { + throw new IOException(e); + } + output.writeInt(buffer.length); + output.write(buffer); + } + + @Override + public int compareTo(NodeWritable other) { + // Use get() rather than accessing the field directly because the node + // field is lazily instantiated from the Thrift term + return NodeUtils.compareRDFTerms(this.get(), other.get()); + } + + @Override + public String toString() { + // Use get() rather than accessing the field directly because the node + // field is lazily instantiated from the Thrift term + Node n = this.get(); + if (n == null) + return ""; + return n.toString(); + } + + @Override + public int hashCode() { + // Use get() rather than accessing the field directly because the node + // field is lazily instantiated from the Thrift term + Node n = this.get(); + return n != null ? this.get().hashCode() : 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NodeWritable)) + return false; + return this.compareTo((NodeWritable) other) == 0; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java new file mode 100644 index 0000000..3d9dd00 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparator; +import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator; +import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter; +import org.apache.jena.riot.thrift.ThriftConvert; +import org.apache.jena.riot.thrift.wire.RDF_Quad; +import org.apache.thrift.TException; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * A writable quad + */ +public class QuadWritable extends AbstractNodeTupleWritable<Quad> { + + static { + WritableComparator.define(QuadWritable.class, new SimpleBinaryComparator()); + } + + private RDF_Quad quad = new RDF_Quad(); + + /** + * Creates a new empty instance + */ + public QuadWritable() { + this(null); + } + + /** + * Creates a new instance with the given value + * + * @param q + * Quad + */ + public QuadWritable(Quad q) { + super(q); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static QuadWritable read(DataInput input) throws IOException { + QuadWritable q = new QuadWritable(); + q.readFields(input); + return q; + } + + @Override + public void set(Quad tuple) { + super.set(tuple); + this.quad.clear(); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.quad.clear(); + int tripleLength = input.readInt(); + byte[] buffer = new byte[tripleLength]; + input.readFully(buffer); + try { + ThriftConverter.fromBytes(buffer, this.quad); + } catch (TException e) { + throw new IOException(e); + } + this.setInternal(new Quad(ThriftConvert.convert(this.quad.getG()), ThriftConvert.convert(this.quad.getS()), + ThriftConvert.convert(this.quad.getP()), ThriftConvert.convert(this.quad.getO()))); + } + + @Override + public void write(DataOutput output) throws IOException { + if (this.get() == null) + throw new IOException( + "Null quads cannot be written using this class, consider using NodeTupleWritable instead"); + + // May not have yet prepared the Thrift triple + if (!this.quad.isSetS()) { + Quad tuple = this.get(); + this.quad.setG(ThriftConvert.convert(tuple.getGraph(), false)); + this.quad.setS(ThriftConvert.convert(tuple.getSubject(), false)); + this.quad.setP(ThriftConvert.convert(tuple.getPredicate(), false)); + this.quad.setO(ThriftConvert.convert(tuple.getObject(), false)); + } + + byte[] buffer; + try { + buffer = ThriftConverter.toBytes(this.quad); + } catch (TException e) { + throw new IOException(e); + } + output.writeInt(buffer.length); + output.write(buffer); + } + + @Override + protected Quad createTuple(Node[] ns) { + if (ns.length != 4) + throw new IllegalArgumentException(String.format( + "Incorrect number of nodes to form a quad - got %d but expected 4", ns.length)); + return new Quad(ns[0], ns[1], ns[2], ns[3]); + } + + @Override + protected Node[] createNodes(Quad tuple) { + return new Node[] { tuple.getGraph(), tuple.getSubject(), tuple.getPredicate(), tuple.getObject() }; + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java new file mode 100644 index 0000000..a17052b --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparator; +import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator; +import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter; +import org.apache.jena.riot.thrift.ThriftConvert; +import org.apache.jena.riot.thrift.wire.RDF_Triple; +import org.apache.thrift.TException; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; + +/** + * A writable triple + * + * + * + */ +public class TripleWritable extends AbstractNodeTupleWritable<Triple> { + + static { + WritableComparator.define(TripleWritable.class, new SimpleBinaryComparator()); + } + + private RDF_Triple triple = new RDF_Triple(); + + /** + * Creates a new instance using the default NTriples node formatter + */ + public TripleWritable() { + this(null); + } + + /** + * Creates a new instance with a given value that uses a specific node + * formatter + * + * @param t + * Triple + */ + public TripleWritable(Triple t) { + super(t); + } + + /** + * Creates a new instance from the given input + * + * @param input + * Input + * @return New instance + * @throws IOException + */ + public static TripleWritable read(DataInput input) throws IOException { + TripleWritable t = new TripleWritable(); + t.readFields(input); + return t; + } + + @Override + public void set(Triple tuple) { + super.set(tuple); + this.triple.clear(); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.triple.clear(); + int tripleLength = input.readInt(); + byte[] buffer = new byte[tripleLength]; + input.readFully(buffer); + try { + ThriftConverter.fromBytes(buffer, this.triple); + } catch (TException e) { + throw new IOException(e); + } + this.setInternal(new Triple(ThriftConvert.convert(this.triple.getS()), + ThriftConvert.convert(this.triple.getP()), ThriftConvert.convert(this.triple.getO()))); + } + + @Override + public void write(DataOutput output) throws IOException { + if (this.get() == null) + throw new IOException( + "Null triples cannot be written using this class, consider using NodeTupleWritable instead"); + + // May not have yet prepared the Thrift triple + if (!this.triple.isSetS()) { + Triple tuple = this.get(); + this.triple.setS(ThriftConvert.convert(tuple.getSubject(), false)); + this.triple.setP(ThriftConvert.convert(tuple.getPredicate(), false)); + this.triple.setO(ThriftConvert.convert(tuple.getObject(), false)); + } + + byte[] buffer; + try { + buffer = ThriftConverter.toBytes(this.triple); + } catch (TException e) { + throw new IOException(e); + } + output.writeInt(buffer.length); + output.write(buffer); + } + + @Override + protected Triple createTuple(Node[] ns) { + if (ns.length != 3) + throw new IllegalArgumentException(String.format( + "Incorrect number of nodes to form a triple - got %d but expected 3", ns.length)); + return new Triple(ns[0], ns[1], ns[2]); + } + + @Override + protected Node[] createNodes(Triple tuple) { + return new Node[] { tuple.getSubject(), tuple.getPredicate(), tuple.getObject() }; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java new file mode 100644 index 0000000..6c46714 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types.comparators; + +import org.apache.hadoop.io.WritableComparator; + +/** + * A general purpose comparator that may be used with any types which can be + * compared directly on their binary encodings + */ +public class SimpleBinaryComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java new file mode 100644 index 0000000..0675afc --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.types.converters; + +import java.io.ByteArrayOutputStream; + +import org.apache.jena.riot.thrift.wire.RDF_Quad; +import org.apache.jena.riot.thrift.wire.RDF_Term; +import org.apache.jena.riot.thrift.wire.RDF_Triple; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; + +/** + * Helper for converting between the binary representation of Nodes, Triples and + * Quads and their Jena API equivalents + * + */ +public class ThriftConverter { + + private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>(); + private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>(); + + private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>(); + private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>(); + private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>(); + + private static TMemoryInputTransport getInputTransport() { + TMemoryInputTransport transport = inputTransports.get(); + if (transport != null) + return transport; + + transport = new TMemoryInputTransport(); + inputTransports.set(transport); + return transport; + } + + private static TProtocol getInputProtocol() { + TProtocol protocol = inputProtocols.get(); + if (protocol != null) + return protocol; + + protocol = new TCompactProtocol(getInputTransport()); + inputProtocols.set(protocol); + return protocol; + } + + private static ByteArrayOutputStream getOutputStream() { + ByteArrayOutputStream output = outputStreams.get(); + if (output != null) + return output; + + output = new ByteArrayOutputStream(); + outputStreams.set(output); + return output; + } + + private static TTransport getOutputTransport() { + TTransport transport = outputTransports.get(); + if (transport != null) + return transport; + + transport = new TIOStreamTransport(getOutputStream()); + outputTransports.set(transport); + return transport; + } + + private static TProtocol getOutputProtocol() { + TProtocol protocol = outputProtocols.get(); + if (protocol != null) + return protocol; + + protocol = new TCompactProtocol(getOutputTransport()); + outputProtocols.set(protocol); + return protocol; + } + + public static byte[] toBytes(RDF_Term term) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + term.write(protocol); + + return output.toByteArray(); + } + + public static void fromBytes(byte[] bs, RDF_Term term) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(bs); + TProtocol protocol = getInputProtocol(); + term.read(protocol); + } + + public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(buffer); + TProtocol protocol = getInputProtocol(); + triple.read(protocol); + } + + public static byte[] toBytes(RDF_Triple triple) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + triple.write(protocol); + + return output.toByteArray(); + } + + public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException { + TMemoryInputTransport transport = getInputTransport(); + transport.reset(buffer); + TProtocol protocol = getInputProtocol(); + quad.read(protocol); + } + + public static byte[] toBytes(RDF_Quad quad) throws TException { + ByteArrayOutputStream output = getOutputStream(); + output.reset(); + + TProtocol protocol = getOutputProtocol(); + quad.write(protocol); + + return output.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java new file mode 100644 index 0000000..7214b14 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; +import org.apache.jena.hadoop.rdf.types.CharacteristicWritable; +import org.junit.Assert; +import org.junit.Test; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; + +/** + * Tests for {@link CharacteristicWritable} and + * {@link CharacteristicSetWritable} + * + * + * + */ +public class CharacteristicTests { + + /** + * Checks whether a writable round trips successfully + * + * @param cw + * Characteristic writable + * @throws IOException + */ + private void checkRoundTrip(CharacteristicWritable cw) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(outputStream); + cw.write(output); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInputStream input = new DataInputStream(inputStream); + CharacteristicWritable actual = CharacteristicWritable.read(input); + Assert.assertEquals(cw, actual); + } + + /** + * Tests characteristic round tripping + * + * @throws IOException + */ + @Test + public void characteristic_writable_01() throws IOException { + Node n = NodeFactory.createURI("http://example.org"); + CharacteristicWritable expected = new CharacteristicWritable(n); + Assert.assertEquals(1, expected.getCount().get()); + + this.checkRoundTrip(expected); + } + + /** + * Tests characteristic properties + * + * @throws IOException + */ + @Test + public void characteristic_writable_02() throws IOException { + Node n = NodeFactory.createURI("http://example.org"); + CharacteristicWritable cw1 = new CharacteristicWritable(n); + CharacteristicWritable cw2 = new CharacteristicWritable(n, 100); + this.checkRoundTrip(cw1); + this.checkRoundTrip(cw2); + + // Should still be equal since equality is only on the node not the + // count + Assert.assertEquals(cw1, cw2); + } + + /** + * Tests characteristic properties + * + * @throws IOException + */ + @Test + public void characteristic_writable_03() throws IOException { + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + this.checkRoundTrip(cw1); + this.checkRoundTrip(cw2); + + // Should not be equal as different nodes + Assert.assertNotEquals(cw1, cw2); + } + + /** + * Checks that a writable round trips + * + * @param set + * Characteristic set + * @throws IOException + */ + private void checkRoundTrip(CharacteristicSetWritable set) throws IOException { + // Test round trip + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(outputStream); + set.write(output); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInputStream input = new DataInputStream(inputStream); + CharacteristicSetWritable actual = CharacteristicSetWritable.read(input); + Assert.assertEquals(set, actual); + } + + /** + * Checks a characteristic set + * + * @param set + * Set + * @param expectedItems + * Expected number of characteristics + * @param expectedCounts + * Expected counts for characteristics + */ + protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) { + Assert.assertEquals(expectedItems, set.size()); + Assert.assertEquals(expectedItems, expectedCounts.length); + Iterator<CharacteristicWritable> iter = set.getCharacteristics(); + int i = 0; + while (iter.hasNext()) { + CharacteristicWritable cw = iter.next(); + Assert.assertEquals(expectedCounts[i], cw.getCount().get()); + i++; + } + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_01() throws IOException { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + set.add(cw1); + set.add(cw2); + this.checkCharacteristicSet(set, 2, new long[] { 1, 1 }); + this.checkRoundTrip(set); + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_02() throws IOException { + CharacteristicSetWritable set = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2); + set.add(cw1); + set.add(cw2); + this.checkCharacteristicSet(set, 1, new long[] { 3 }); + this.checkRoundTrip(set); + } + + /** + * Tests characteristic sets + * + * @throws IOException + */ + @Test + public void characteristic_set_writable_03() throws IOException { + CharacteristicSetWritable set1 = new CharacteristicSetWritable(); + CharacteristicSetWritable set2 = new CharacteristicSetWritable(); + + // Add some characteristics + CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org")); + CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other")); + set1.add(cw1); + set2.add(cw2); + this.checkCharacteristicSet(set1, 1, new long[] { 1 }); + this.checkCharacteristicSet(set2, 1, new long[] { 1 }); + this.checkRoundTrip(set1); + this.checkRoundTrip(set2); + + Assert.assertNotEquals(set1, set2); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java new file mode 100644 index 0000000..a70dfb0 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.jena.atlas.lib.Tuple; +import org.apache.jena.hadoop.rdf.types.NodeTupleWritable; +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.types.TripleWritable; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hp.hpl.jena.datatypes.xsd.XSDDatatype; +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Tests for the various RDF types defined by the + * {@link org.apache.jena.hadoop.rdf.types} package + * + * + * + */ +public class RdfTypesTest { + + private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class); + + private ByteArrayOutputStream outputStream; + private ByteArrayInputStream inputStream; + + /** + * Prepare for output + * + * @return Data output + */ + private DataOutput prepareOutput() { + this.outputStream = new ByteArrayOutputStream(); + return new DataOutputStream(this.outputStream); + } + + /** + * Prepare for input from the previously written output + * + * @return Data Input + */ + private DataInput prepareInput() { + this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray()); + return new DataInputStream(this.inputStream); + } + + /** + * Prepare for input from the given data + * + * @param data + * Data + * @return Data Input + */ + @SuppressWarnings("unused") + private DataInput prepareInput(byte[] data) { + this.inputStream = new ByteArrayInputStream(data); + return new DataInputStream(this.inputStream); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException, + ClassNotFoundException { + // Write out data + DataOutput output = this.prepareOutput(); + writable.write(output); + + // Read back in data + DataInput input = this.prepareInput(); + T actual = (T) Class.forName(writable.getClass().getName()).newInstance(); + actual.readFields(input); + + LOG.info("Original = " + writable.toString()); + LOG.info("Round Tripped = " + actual.toString()); + + // Check equivalent + Assert.assertEquals(0, expected.compareTo(actual)); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = null; + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + @Ignore + public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createVariable("x"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + @Ignore + public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createURI("http://example.org"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createURI("http://user:[email protected]/some/path?key=value#id"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("simple"); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("language", "en", null); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createAnon(); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + } + + /** + * Basic node writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Node n = NodeFactory.createAnon(); + NodeWritable nw = new NodeWritable(n); + testWriteRead(nw, nw); + NodeWritable nw2 = new NodeWritable(n); + testWriteRead(nw2, nw2); + + Assert.assertEquals(0, nw.compareTo(nw2)); + } + + /** + * Basic triple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); + TripleWritable tw = new TripleWritable(t); + testWriteRead(tw, tw); + } + + /** + * Basic triple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value")); + TripleWritable tw = new TripleWritable(t); + testWriteRead(tw, tw); + } + + /** + * Basic quad writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), + NodeFactory.createLiteral("value")); + QuadWritable qw = new QuadWritable(q); + testWriteRead(qw, qw); + } + + /** + * Basic quad writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), + NodeFactory.createLiteral("value")); + QuadWritable qw = new QuadWritable(q); + testWriteRead(qw, qw); + } + + /** + * Basic tuple writable round tripping test + * + * @throws IOException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ClassNotFoundException + */ + @Test + public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException { + Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"), + NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three")); + NodeTupleWritable tw = new NodeTupleWritable(t); + testWriteRead(tw, tw); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/pom.xml ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/pom.xml b/jena-hadoop-rdf/jena-elephas-io/pom.xml new file mode 100644 index 0000000..2be37f9 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/pom.xml @@ -0,0 +1,67 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.jena</groupId> + <artifactId>jena-elephas</artifactId> + <version>0.9.0-SNAPSHOT</version> + </parent> + <artifactId>jena-elephas-io</artifactId> + <name>Apache Jena - Elephas - I/O</name> + <description>RDF Input/Output formats library for Hadoop</description> + + <!-- Note that versions are managed by parent POMs --> + <dependencies> + <!-- Internal Project Dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-hadoop-rdf-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Hadoop Dependencies --> + <!-- Note these will be provided on the Hadoop cluster hence the provided + scope --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Jena dependencies --> + <dependency> + <groupId>org.apache.jena</groupId> + <artifactId>jena-arq</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java new file mode 100644 index 0000000..5c1b41c --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io; + +/** + * Hadoop IO related constants + * + * + * + */ +public class HadoopIOConstants { + + /** + * Private constructor prevents instantiation + */ + private HadoopIOConstants() { + } + + /** + * Map Reduce configuration setting for max line length + */ + public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; + + /** + * Run ID + */ + public static final String RUN_ID = "runId"; + + /** + * Compression codecs to use + */ + public static final String IO_COMPRESSION_CODECS = "io.compression.codecs"; +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java new file mode 100644 index 0000000..27c2bb2 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io; + +import java.io.IOException; + +/** + * RDF IO related constants + * + * + * + */ +public class RdfIOConstants { + + /** + * Private constructor prevents instantiation + */ + private RdfIOConstants() { + } + + /** + * Configuration key used to set whether bad tuples are ignored. This is the + * default behaviour, when explicitly set to {@code false} bad tuples will + * result in {@link IOException} being thrown by the relevant record + * readers. + */ + public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples"; + + /** + * Configuration key used to set the batch size used for RDF output formats + * that take a batched writing approach. Default value is given by the + * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}. + */ + public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size"; + + /** + * Default batch size for batched output formats + */ + public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000; + + /** + * Configuration key used to control behaviour with regards to how blank + * nodes are handled. + * <p> + * The default behaviour is that blank nodes are file scoped which is what + * the RDF specifications require. + * </p> + * <p> + * However in the case of a multi-stage pipeline this behaviour can cause + * blank nodes to diverge over several jobs and introduce spurious blank + * nodes over time. This is described in <a + * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and + * enabling this flag for jobs in your pipeline allow you to work around + * this problem. + * </p> + * <h3>Warning</h3> You should only enable this flag for jobs that take in + * RDF output originating from previous jobs since our normal blank node + * allocation policy ensures that blank nodes will be file scoped and unique + * over all files (barring unfortunate hasing collisions). If you enable + * this for jobs that take in RDF originating from other sources you may + * incorrectly conflate blank nodes that are supposed to distinct and + * separate nodes. + */ + public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity"; +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java new file mode 100644 index 0000000..1fcb030 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract line based input format that reuses the machinery from + * {@link NLineInputFormat} to calculate the splits + * + * + * + * @param <TKey> + * Key type + * @param <TValue> + * Value type + */ +public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNLineFileInputFormat.class); + + /** + * Logically splits the set of input files for the job, splits N lines of + * the input as one split. + * + * @see FileInputFormat#getSplits(JobContext) + */ + public final List<InputSplit> getSplits(JobContext job) throws IOException { + boolean debug = LOGGER.isDebugEnabled(); + if (debug && FileInputFormat.getInputDirRecursive(job)) { + LOGGER.debug("Recursive searching for input data is enabled"); + } + + List<InputSplit> splits = new ArrayList<InputSplit>(); + int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job); + for (FileStatus status : listStatus(job)) { + if (debug) { + LOGGER.debug("Determining how to split input file/directory {}", status.getPath()); + } + splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit)); + } + return splits; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java new file mode 100644 index 0000000..e561cdb --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Abstract implementation of a while file input format where each file is a + * single split + * + * + * + * @param <TKey> + * Key type + * @param <TValue> + * Value type + */ +public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> { + + @Override + protected final boolean isSplitable(JobContext context, Path filename) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java new file mode 100644 index 0000000..b8fdbd5 --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * RDF input format that can handle any RDF quads format that ARQ supports + * selecting the format to use for each file based upon the file extension + * + * + * + */ +public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new QuadsReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java new file mode 100644 index 0000000..03f394a --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +/** + * RDF input format that can handle any RDF triples format that ARQ supports + * selecting the format to use for each file based upon the file extension + */ +public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> { + + @Override + public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriplesReader(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java new file mode 100644 index 0000000..bfd643e --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.io.input; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader; +import org.apache.jena.hadoop.rdf.types.QuadWritable; + + +/** + * RDF input format that can handle any RDF triple/quads format that ARQ + * supports selecting the format to use for each file based upon the file + * extension. Triples are converted into quads in the default graph. + * + * + * + */ +public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> { + + @Override + public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new TriplesOrQuadsReader(); + } + +}
