Updated Branches: refs/heads/trunk 4b8fae259 -> 768b7917f
GIRAPH-779: Make it easier to reuse objects with hive-io input (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/768b7917 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/768b7917 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/768b7917 Branch: refs/heads/trunk Commit: 768b7917f260bc3528f30c541e8974a841fe3fbc Parents: 4b8fae2 Author: Maja Kabiljo <[email protected]> Authored: Thu Oct 10 21:55:47 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Oct 10 21:55:47 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + giraph-hive/pom.xml | 4 ++ .../apache/giraph/hive/common/HiveParsing.java | 21 +++++--- .../hive/input/edge/SimpleHiveToEdge.java | 13 +++++ .../input/edge/examples/HiveIntDoubleEdge.java | 15 +++--- .../input/edge/examples/HiveIntNullEdge.java | 10 ++-- .../hive/input/vertex/SimpleHiveToVertex.java | 50 +++++++++++++------- .../examples/HiveIntDoubleDoubleVertex.java | 12 ++--- .../vertex/examples/HiveIntIntNullVertex.java | 10 ++-- .../vertex/examples/HiveIntNullNullVertex.java | 14 +++--- .../giraph/hive/jython/JythonHiveToEdge.java | 6 +-- .../giraph/hive/jython/JythonHiveToVertex.java | 4 +- 12 files changed, 103 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 2d746b7..3df3164 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-779: Make it easier to reuse objects with hive-io input (majakabiljo) + GIRAPH-778: Testing with TestGraph is broken (majakabiljo) GIRAPH-777: Fix bug from GIRAPH-775 (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-hive/pom.xml b/giraph-hive/pom.xml index daa08b3..7feb510 100644 --- a/giraph-hive/pom.xml +++ b/giraph-hive/pom.xml @@ -112,6 +112,10 @@ under the License. <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> </dependency> + <dependency> + <groupId>org.python</groupId> + <artifactId>jython</artifactId> + </dependency> <!-- test dependencies. sorted lexicographically. --> <dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java index 56f5119..7ceba23 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java @@ -62,33 +62,40 @@ public class HiveParsing { * Parse a Integer ID from a Hive record * @param record Hive record to parse * @param columnIndex offset of column in row - * @return LongWritable ID + * @param reusableId Reusable vertex id object + * @return IntWritable ID */ public static IntWritable parseIntID(HiveReadableRecord record, - int columnIndex) { - return new IntWritable(parseInt(record, columnIndex)); + int columnIndex, IntWritable reusableId) { + reusableId.set(parseInt(record, columnIndex)); + return reusableId; } /** * Parse a Long ID from a Hive record * @param record Hive record to parse * @param columnIndex offset of column in row + * @param reusableId Reusable vertex id object * @return LongWritable ID */ public static LongWritable parseLongID(HiveReadableRecord record, - int columnIndex) { - return new LongWritable(record.getLong(columnIndex)); + int columnIndex, LongWritable reusableId) { + reusableId.set(record.getLong(columnIndex)); + return reusableId; } /** * Parse a weight from a Hive record * @param record Hive record to parse * @param columnIndex offset of column in row + * @param reusableDoubleWritable Reusable DoubleWritable object + * * @return DoubleWritable weight */ public static DoubleWritable parseDoubleWritable(HiveReadableRecord record, - int columnIndex) { - return new DoubleWritable(record.getDouble(columnIndex)); + int columnIndex, DoubleWritable reusableDoubleWritable) { + reusableDoubleWritable.set(record.getDouble(columnIndex)); + return reusableDoubleWritable; } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java index c365a87..55d9299 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java @@ -67,6 +67,7 @@ public abstract class SimpleHiveToEdge<I extends WritableComparable, @Override public void initializeRecords(Iterator<HiveReadableRecord> records) { this.records = records; + reusableEdge.setSourceVertexId(getConf().createVertexId()); reusableEdge.setEdge(getConf().createReusableEdge()); } @@ -83,5 +84,17 @@ public abstract class SimpleHiveToEdge<I extends WritableComparable, reusableEdge.setEdgeValue(getEdgeValue(record)); return reusableEdge; } + + protected I getReusableSourceVertexId() { + return reusableEdge.getSourceVertexId(); + } + + protected I getReusableTargetVertexId() { + return reusableEdge.getTargetVertexId(); + } + + protected E getReusableEdgeValue() { + return reusableEdge.getEdgeValue(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java index 9f95da2..55ccca3 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java @@ -24,9 +24,9 @@ import org.apache.hadoop.io.IntWritable; import com.facebook.hiveio.common.HiveType; import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.input.parser.Records; import com.facebook.hiveio.record.HiveReadableRecord; import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Preconditions; /** * A simple HiveToEdge with integer IDs and double edge values. @@ -35,23 +35,24 @@ public class HiveIntDoubleEdge extends SimpleHiveToEdge<IntWritable, DoubleWritable> { @Override public void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema) { - Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(1) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(2) == HiveType.DOUBLE); + Records.verifyType(0, HiveType.INT, schema); + Records.verifyType(1, HiveType.INT, schema); + Records.verifyType(2, HiveType.DOUBLE, schema); } @Override public DoubleWritable getEdgeValue(HiveReadableRecord hiveRecord) { - return HiveParsing.parseDoubleWritable(hiveRecord, 2); + return HiveParsing.parseDoubleWritable(hiveRecord, 2, + getReusableEdgeValue()); } @Override public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 0); + return HiveParsing.parseIntID(hiveRecord, 0, getReusableSourceVertexId()); } @Override public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 1); + return HiveParsing.parseIntID(hiveRecord, 1, getReusableTargetVertexId()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java index 87e1bd5..cc1013c 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java @@ -25,9 +25,9 @@ import org.apache.log4j.Logger; import com.facebook.hiveio.common.HiveType; import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.input.parser.Records; import com.facebook.hiveio.record.HiveReadableRecord; import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Preconditions; /** * A simple HiveToEdge with integer IDs, no edge value, that assumes the Hive @@ -40,8 +40,8 @@ public class HiveIntNullEdge @Override public void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema) { - Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(1) == HiveType.INT); + Records.verifyType(0, HiveType.INT, schema); + Records.verifyType(1, HiveType.INT, schema); } @Override public NullWritable getEdgeValue(HiveReadableRecord hiveRecord) { @@ -50,11 +50,11 @@ public class HiveIntNullEdge @Override public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 0); + return HiveParsing.parseIntID(hiveRecord, 0, getReusableSourceVertexId()); } @Override public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 1); + return HiveParsing.parseIntID(hiveRecord, 1, getReusableTargetVertexId()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java index 5483a68..321c945 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java @@ -18,8 +18,8 @@ package org.apache.giraph.hive.input.vertex; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.OutEdges; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -43,7 +43,14 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, private Iterator<HiveReadableRecord> records; /** Reusable vertex object */ - private Vertex<I, V, E> reusableVertex = null; + private Vertex<I, V, E> reusableVertex; + + /** Reusable vertex id */ + private I reusableVertexId; + /** Reusable vertex value */ + private V reusableVertexValue; + /** Reusable edges */ + private OutEdges<I, E> reusableOutEdges; /** * Read the Vertex's ID from the HiveRecord given. @@ -70,17 +77,12 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, public abstract Iterable<Edge<I, E>> getEdges(HiveReadableRecord record); @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, V, E> conf) { - super.setConf(conf); - if (conf.reuseVertexObjects()) { - reusableVertex = getConf().createVertex(); - } - } - - @Override public void initializeRecords(Iterator<HiveReadableRecord> records) { this.records = records; + reusableVertex = getConf().createVertex(); + reusableVertexId = getConf().createVertexId(); + reusableVertexValue = getConf().createVertexValue(); + reusableOutEdges = getConf().createOutEdges(); } @Override @@ -94,11 +96,25 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, I id = getVertexId(record); V value = getVertexValue(record); Iterable<Edge<I, E>> edges = getEdges(record); - Vertex<I, V, E> vertex = reusableVertex; - if (vertex == null) { - vertex = getConf().createVertex(); - } - vertex.initialize(id, value, edges); - return vertex; + reusableVertex.initialize(id, value, edges); + return reusableVertex; + } + + protected I getReusableVertexId() { + return reusableVertexId; + } + + protected V getReusableVertexValue() { + return reusableVertexValue; + } + + /** + * Get reusable OutEdges object + * + * @param <OE> Type of OutEdges + * @return Reusable OutEdges object + */ + protected <OE extends OutEdges<I, E>> OE getReusableOutEdges() { + return (OE) reusableOutEdges; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java index 1af01d4..cae3e01 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java @@ -25,9 +25,9 @@ import org.apache.hadoop.io.IntWritable; import com.facebook.hiveio.common.HiveType; import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.input.parser.Records; import com.facebook.hiveio.record.HiveReadableRecord; import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Preconditions; /** * Simple HiveToVertex that reads vertices with integer IDs, Double vertex @@ -37,9 +37,9 @@ public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable, DoubleWritable, DoubleWritable> { @Override public void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema) { - Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(1) == HiveType.DOUBLE); - Preconditions.checkArgument(schema.columnType(2) == HiveType.MAP); + Records.verifyType(0, HiveType.INT, schema); + Records.verifyType(1, HiveType.DOUBLE, schema); + Records.verifyType(2, HiveType.MAP, schema); } @Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges( @@ -49,11 +49,11 @@ public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable, @Override public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0); + return HiveParsing.parseIntID(record, 0, getReusableVertexId()); } @Override public DoubleWritable getVertexValue(HiveReadableRecord record) { - return HiveParsing.parseDoubleWritable(record, 1); + return HiveParsing.parseDoubleWritable(record, 1, getReusableVertexValue()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java index cc61441..526a9a7 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java @@ -19,9 +19,9 @@ package org.apache.giraph.hive.input.vertex.examples; import com.facebook.hiveio.common.HiveType; import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.input.parser.Records; import com.facebook.hiveio.record.HiveReadableRecord; import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Preconditions; import org.apache.giraph.edge.Edge; import org.apache.giraph.hive.common.HiveParsing; import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; @@ -36,8 +36,8 @@ public class HiveIntIntNullVertex extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> { @Override public void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema) { - Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST); + Records.verifyType(0, HiveType.INT, schema); + Records.verifyType(1, HiveType.LIST, schema); } @Override @@ -48,11 +48,11 @@ public class HiveIntIntNullVertex @Override public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0); + return HiveParsing.parseIntID(record, 0, getReusableVertexId()); } @Override public IntWritable getVertexValue(HiveReadableRecord record) { - return new IntWritable(); + return getReusableVertexValue(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java index 02ad6a1..9148c57 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java @@ -25,9 +25,9 @@ import org.apache.hadoop.io.NullWritable; import com.facebook.hiveio.common.HiveType; import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.input.parser.Records; import com.facebook.hiveio.record.HiveReadableRecord; import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Preconditions; /** * Simple HiveToVertex that reads vertices with integer IDs, no vertex values, @@ -37,8 +37,8 @@ public class HiveIntNullNullVertex extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> { @Override public void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema) { - Preconditions.checkArgument(schema.columnType(0) == HiveType.INT); - Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST); + Records.verifyType(0, HiveType.INT, schema); + Records.verifyType(1, HiveType.LIST, schema); } @Override @@ -47,11 +47,13 @@ public class HiveIntNullNullVertex return HiveParsing.parseIntNullEdges(record, 1); } - @Override public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0); + @Override + public IntWritable getVertexId(HiveReadableRecord record) { + return HiveParsing.parseIntID(record, 0, getReusableVertexId()); } - @Override public NullWritable getVertexValue(HiveReadableRecord record) { + @Override + public NullWritable getVertexValue(HiveReadableRecord record) { return NullWritable.get(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java index c541232..1fd5fd0 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java @@ -85,21 +85,21 @@ public class JythonHiveToEdge<I extends WritableComparable, E extends Writable> @Override public I getSourceVertexId(HiveReadableRecord record) { - I sourceId = getConf().getVertexIdFactory().newInstance(); + I sourceId = getReusableSourceVertexId(); sourceIdReader.readFields(sourceId, record); return sourceId; } @Override public I getTargetVertexId(HiveReadableRecord record) { - I targetId = getConf().getVertexIdFactory().newInstance(); + I targetId = getReusableTargetVertexId(); targetIdReader.readFields(targetId, record); return targetId; } @Override public E getEdgeValue(HiveReadableRecord record) { - E edgeValue = getConf().getEdgeValueFactory().newInstance(); + E edgeValue = getReusableEdgeValue(); edgeValueReader.readFields(edgeValue, record); return edgeValue; } http://git-wip-us.apache.org/repos/asf/giraph/blob/768b7917/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java index f449ec5..b24283e 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java @@ -85,14 +85,14 @@ public class JythonHiveToVertex<I extends WritableComparable, @Override public I getVertexId(HiveReadableRecord record) { - I vertexId = getConf().getVertexIdFactory().newInstance(); + I vertexId = getReusableVertexId(); vertexIdReader.readFields(vertexId, record); return vertexId; } @Override public V getVertexValue(HiveReadableRecord record) { - V vertexValue = getConf().getVertexValueFactory().newInstance(); + V vertexValue = getReusableVertexValue(); vertexValueReader.readFields(vertexValue, record); return vertexValue; }
