Repository: giraph Updated Branches: refs/heads/trunk 06de6c48a -> 8bf08f545
[GIRAPH-1117] Provide a flexible way to decide whether to create vertex when it is not present in the input Test Plan: run hello pagerank with this feature on and off Reviewers: majakabiljo, maja.kabiljo, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D64485 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8bf08f54 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8bf08f54 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8bf08f54 Branch: refs/heads/trunk Commit: 8bf08f545d15b39e43874a2b1ac5b0586a917a4f Parents: 06de6c4 Author: Sergey Edunov <edu...@fb.com> Authored: Thu Sep 29 16:54:18 2016 -0700 Committer: Sergey Edunov <edu...@fb.com> Committed: Thu Sep 29 16:54:18 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/conf/GiraphConstants.java | 14 +++++ .../apache/giraph/edge/AbstractEdgeStore.java | 13 +++- .../giraph/edge/CreateSourceVertexCallback.java | 42 +++++++++++++ .../edge/DefaultCreateSourceVertexCallback.java | 50 +++++++++++++++ .../java/org/apache/giraph/utils/TestGraph.java | 33 +++++----- .../giraph/io/TestCreateSourceVertex.java | 65 ++++++++++++++++++++ 6 files changed, 199 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index b384261..437d08a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -27,6 +27,8 @@ import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.DefaultCreateSourceVertexCallback; +import org.apache.giraph.edge.CreateSourceVertexCallback; import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.edge.InMemoryEdgeStoreFactory; import org.apache.giraph.edge.OutEdges; @@ -1118,6 +1120,18 @@ public interface GiraphConstants { "necessarily in vertex input"); /** + * Defines a call back that can be used to make decisions on + * whether the vertex should be created or not in the runtime. + */ + ClassConfOption<CreateSourceVertexCallback> + CREATE_EDGE_SOURCE_VERTICES_CALLBACK = + ClassConfOption.create("giraph.createEdgeSourceVerticesCallback", + DefaultCreateSourceVertexCallback.class, + CreateSourceVertexCallback.class, + "Decide whether we should create a source vertex when id is " + + "present in the edge input but not in vertex input"); + + /** * This counter group will contain one counter whose name is the ZooKeeper * server:port which this job is using */ http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java index 104cae2..d2e7e8d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java @@ -21,6 +21,7 @@ package org.apache.giraph.edge; import com.google.common.collect.MapMaker; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.ooc.OutOfCoreEngine; @@ -81,6 +82,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, protected boolean useInputOutEdges; /** Whether we spilled edges on disk */ private boolean hasEdgesOnDisk = false; + /** Create source vertices */ + private CreateSourceVertexCallback<I> createSourceVertexCallback; /** * Constructor. @@ -100,6 +103,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, configuration.getNettyServerExecutionConcurrency()).makeMap(); reuseEdgeObjects = configuration.reuseEdgeObjects(); useInputOutEdges = configuration.useInputOutEdges(); + createSourceVertexCallback = + GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK + .newInstance(configuration); } /** @@ -247,7 +253,6 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, @Override public void moveEdgesToVertices() { - final boolean createSourceVertex = configuration.getCreateSourceVertex(); if (transientEdges.isEmpty() && !hasEdgesOnDisk) { if (LOG.isInfoEnabled()) { LOG.info("moveEdgesToVertices: No edges to move"); @@ -256,7 +261,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, } if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); + LOG.info("moveEdgesToVertices: Moving incoming edges to " + + "vertices. Using " + createSourceVertexCallback); } service.getPartitionStore().startIteration(); @@ -307,7 +313,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, // If the source vertex doesn't exist, create it. Otherwise, // just set the edges. if (vertex == null) { - if (createSourceVertex) { + if (createSourceVertexCallback + .shouldCreateSourceVertex(vertexId)) { // createVertex only if it is allowed by configuration vertex = configuration.createVertex(); vertex.initialize(createVertexId(entry), http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java new file mode 100644 index 0000000..9d6ed1b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.hadoop.io.Writable; + +/** + * Implementations of this interface can decide whether + * we should create a vertex when it is not present in vertex input + * but exists in edge input. + * + * @param <I> vertex id + */ +public interface CreateSourceVertexCallback<I extends Writable> + extends GiraphConfigurationSettable { + + /** + * Should we create a vertex that doesn't exist in vertex input + * but only exists in edge input + * @param vertexId the id of vertex to be created + * @return true if we should create a vertex + */ + boolean shouldCreateSourceVertex(I vertexId); + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java new file mode 100644 index 0000000..19ed598 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; + +/** + * Default implementation of vertex creation decision maker. + * By default you can either create all vertices or not create + * implicit vertices at all. + * + * @param <I> Vertex id + */ +public class DefaultCreateSourceVertexCallback<I extends Writable> + implements CreateSourceVertexCallback<I> { + /** + * True if giraph has to create even vertices that only exist + * in edge input + */ + private boolean shouldCreateVertices; + + @Override + public boolean shouldCreateSourceVertex(I vertexId) { + return shouldCreateVertices; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + shouldCreateVertices = + GiraphConstants.CREATE_EDGE_SOURCE_VERTICES.get(configuration); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java index 46f7b48..363d4c0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map.Entry; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.CreateSourceVertexCallback; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; @@ -54,6 +56,8 @@ public class TestGraph<I extends WritableComparable, protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices; /** The configuration */ protected ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Callback that makes a decision on whether vertex should be created */ + private CreateSourceVertexCallback<I> createSourceVertexCallback; /** * Constructor requiring classes @@ -62,6 +66,9 @@ public class TestGraph<I extends WritableComparable, */ public TestGraph(GiraphConfiguration conf) { this.conf = new ImmutableClassesGiraphConfiguration<>(conf); + createSourceVertexCallback = + GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK + .newInstance(this.conf); vertexValueCombiner = this.conf.createVertexValueCombiner(); vertices = BasicCollectionsUtils.create2ObjectMap( this.conf.getVertexIdClass() @@ -147,21 +154,13 @@ public class TestGraph<I extends WritableComparable, /** * Add an edge to an existing vertex - * + *` * @param vertexId Edge origin * @param edgePair The edge * @return this */ public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) { - if (!vertices.containsKey(vertexId)) { - Vertex<I, V, E> v = conf.createVertex(); - v.initialize(vertexId, conf.createVertexValue()); - vertices.put(vertexId, v); - } - vertices.get(vertexId) - .addEdge(EdgeFactory.create(edgePair.getKey(), - edgePair.getValue())); - return this; + return addEdge(vertexId, edgePair.getKey(), edgePair.getValue()); } /** @@ -174,12 +173,16 @@ public class TestGraph<I extends WritableComparable, */ public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) { if (!vertices.containsKey(vertexId)) { - Vertex<I, V, E> v = conf.createVertex(); - v.initialize(vertexId, conf.createVertexValue()); - vertices.put(vertexId, v); + if (createSourceVertexCallback.shouldCreateSourceVertex(vertexId)) { + Vertex<I, V, E> v = conf.createVertex(); + v.initialize(vertexId, conf.createVertexValue()); + vertices.put(vertexId, v); + } + } + Vertex<I, V, E> v = vertices.get(vertexId); + if (v != null) { + v.addEdge(EdgeFactory.create(toVertex, edgeValue)); } - vertices.get(vertexId) - .addEdge(EdgeFactory.create(toVertex, edgeValue)); return this; } http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java index 039e975..35c4390 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java @@ -20,13 +20,16 @@ package org.apache.giraph.io; import com.google.common.collect.Maps; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.DefaultCreateSourceVertexCallback; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat; import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; import org.apache.giraph.utils.ComputationCountEdges; import org.apache.giraph.utils.IntIntNullNoOpComputation; import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.IntWritable; import org.junit.Test; import java.util.Map; @@ -133,6 +136,68 @@ public class TestCreateSourceVertex { assertEquals(1, (int) values.get(7)); } + @Test + public void testCustomCreateSourceVertex() throws Exception { + String [] vertices = new String[] { + "1 0", + "2 0", + "3 0", + "4 0", + }; + String [] edges = new String[] { + "1 2", + "1 5", + "2 4", + "2 1", + "3 4", + "4 1", + "4 5", + "6 2", + "7 8", + "4 8", + }; + + GiraphConfiguration conf = getConf(); + GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK.set(conf, + CreateEvenSourceVerticesCallback.class); + + Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges); + Map<Integer, Integer> values = parseResults(results); + + // Check that only vertices from vertex input are present in output graph + assertEquals(5, values.size()); + // Check that the ids of vertices in output graph exactly match vertex input + assertTrue(values.containsKey(1)); + assertTrue(values.containsKey(2)); + assertTrue(values.containsKey(3)); + assertTrue(values.containsKey(4)); + assertTrue(values.containsKey(6)); + + conf.setComputationClass(ComputationCountEdges.class); + results = InternalVertexRunner.run(conf, vertices, edges); + values = parseResults(results); + + // Check the number of edges of each vertex + assertEquals(2, (int) values.get(1)); + assertEquals(2, (int) values.get(2)); + assertEquals(1, (int) values.get(3)); + assertEquals(3, (int) values.get(4)); + assertEquals(1, (int) values.get(6)); + } + + /** + * Only allows to create vertices with even ids. + */ + public static class CreateEvenSourceVerticesCallback extends + DefaultCreateSourceVertexCallback<IntWritable> { + + @Override + public boolean shouldCreateSourceVertex(IntWritable vertexId) { + return vertexId.get() % 2 == 0; + } + } + + private GiraphConfiguration getConf() { GiraphConfiguration conf = new GiraphConfiguration(); conf.setComputationClass(IntIntNullNoOpComputation.class);