Repository: giraph
Updated Branches:
  refs/heads/trunk 06de6c48a -> 8bf08f545


[GIRAPH-1117] Provide a flexible way to decide whether to create vertex when it 
is not present in the input

Test Plan: run hello pagerank with this feature on and off

Reviewers: majakabiljo, maja.kabiljo, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D64485


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8bf08f54
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8bf08f54
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8bf08f54

Branch: refs/heads/trunk
Commit: 8bf08f545d15b39e43874a2b1ac5b0586a917a4f
Parents: 06de6c4
Author: Sergey Edunov <edu...@fb.com>
Authored: Thu Sep 29 16:54:18 2016 -0700
Committer: Sergey Edunov <edu...@fb.com>
Committed: Thu Sep 29 16:54:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/conf/GiraphConstants.java | 14 +++++
 .../apache/giraph/edge/AbstractEdgeStore.java   | 13 +++-
 .../giraph/edge/CreateSourceVertexCallback.java | 42 +++++++++++++
 .../edge/DefaultCreateSourceVertexCallback.java | 50 +++++++++++++++
 .../java/org/apache/giraph/utils/TestGraph.java | 33 +++++-----
 .../giraph/io/TestCreateSourceVertex.java       | 65 ++++++++++++++++++++
 6 files changed, 199 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index b384261..437d08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -27,6 +27,8 @@ import 
org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
+import org.apache.giraph.edge.CreateSourceVertexCallback;
 import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
 import org.apache.giraph.edge.OutEdges;
@@ -1118,6 +1120,18 @@ public interface GiraphConstants {
           "necessarily in vertex input");
 
   /**
+   * Defines a call back that can be used to make decisions on
+   * whether the vertex should be created or not in the runtime.
+   */
+  ClassConfOption<CreateSourceVertexCallback>
+      CREATE_EDGE_SOURCE_VERTICES_CALLBACK =
+      ClassConfOption.create("giraph.createEdgeSourceVerticesCallback",
+          DefaultCreateSourceVertexCallback.class,
+          CreateSourceVertexCallback.class,
+          "Decide whether we should create a source vertex when id is " +
+              "present in the edge input but not in vertex input");
+
+  /**
    * This counter group will contain one counter whose name is the ZooKeeper
    * server:port which this job is using
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 104cae2..d2e7e8d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -21,6 +21,7 @@ package org.apache.giraph.edge;
 import com.google.common.collect.MapMaker;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.ooc.OutOfCoreEngine;
@@ -81,6 +82,8 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
   protected boolean useInputOutEdges;
   /** Whether we spilled edges on disk */
   private boolean hasEdgesOnDisk = false;
+  /** Create source vertices */
+  private CreateSourceVertexCallback<I> createSourceVertexCallback;
 
   /**
    * Constructor.
@@ -100,6 +103,9 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
       configuration.getNettyServerExecutionConcurrency()).makeMap();
     reuseEdgeObjects = configuration.reuseEdgeObjects();
     useInputOutEdges = configuration.useInputOutEdges();
+    createSourceVertexCallback =
+        GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
+            .newInstance(configuration);
   }
 
   /**
@@ -247,7 +253,6 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
 
   @Override
   public void moveEdgesToVertices() {
-    final boolean createSourceVertex = configuration.getCreateSourceVertex();
     if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
       if (LOG.isInfoEnabled()) {
         LOG.info("moveEdgesToVertices: No edges to move");
@@ -256,7 +261,8 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
     }
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+      LOG.info("moveEdgesToVertices: Moving incoming edges to " +
+          "vertices. Using " + createSourceVertexCallback);
     }
 
     service.getPartitionStore().startIteration();
@@ -307,7 +313,8 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
                 // If the source vertex doesn't exist, create it. Otherwise,
                 // just set the edges.
                 if (vertex == null) {
-                  if (createSourceVertex) {
+                  if (createSourceVertexCallback
+                      .shouldCreateSourceVertex(vertexId)) {
                     // createVertex only if it is allowed by configuration
                     vertex = configuration.createVertex();
                     vertex.initialize(createVertexId(entry),

http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
 
b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
new file mode 100644
index 0000000..9d6ed1b
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Implementations of this interface can decide whether
+ * we should create a vertex when it is not present in vertex input
+ * but exists in edge input.
+ *
+ * @param <I> vertex id
+ */
+public interface CreateSourceVertexCallback<I extends Writable>
+    extends GiraphConfigurationSettable {
+
+  /**
+   * Should we create a vertex that doesn't exist in vertex input
+   * but only exists in edge input
+   * @param vertexId the id of vertex to be created
+   * @return true if we should create a vertex
+   */
+  boolean shouldCreateSourceVertex(I vertexId);
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
 
b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
new file mode 100644
index 0000000..19ed598
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Default implementation of vertex creation decision maker.
+ * By default you can either create all vertices or not create
+ * implicit vertices at all.
+ *
+ * @param <I> Vertex id
+ */
+public class DefaultCreateSourceVertexCallback<I extends Writable>
+    implements CreateSourceVertexCallback<I> {
+  /**
+   * True if giraph has to create even vertices that only exist
+   * in edge input
+   */
+  private boolean shouldCreateVertices;
+
+  @Override
+  public boolean shouldCreateSourceVertex(I vertexId) {
+    return shouldCreateVertices;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    shouldCreateVertices =
+        GiraphConstants.CREATE_EDGE_SOURCE_VERTICES.get(configuration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 46f7b48..363d4c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.CreateSourceVertexCallback;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
@@ -54,6 +56,8 @@ public class TestGraph<I extends WritableComparable,
   protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
   /** The configuration */
   protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Callback that makes a decision on whether vertex should be created */
+  private CreateSourceVertexCallback<I> createSourceVertexCallback;
 
   /**
    * Constructor requiring classes
@@ -62,6 +66,9 @@ public class TestGraph<I extends WritableComparable,
    */
   public TestGraph(GiraphConfiguration conf) {
     this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
+    createSourceVertexCallback =
+        GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
+            .newInstance(this.conf);
     vertexValueCombiner = this.conf.createVertexValueCombiner();
     vertices = BasicCollectionsUtils.create2ObjectMap(
       this.conf.getVertexIdClass()
@@ -147,21 +154,13 @@ public class TestGraph<I extends WritableComparable,
 
   /**
    * Add an edge to an existing vertex
-   *
+   *`
    * @param vertexId Edge origin
    * @param edgePair The edge
    * @return this
    */
   public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
-    if (!vertices.containsKey(vertexId)) {
-      Vertex<I, V, E> v = conf.createVertex();
-      v.initialize(vertexId, conf.createVertexValue());
-      vertices.put(vertexId, v);
-    }
-    vertices.get(vertexId)
-      .addEdge(EdgeFactory.create(edgePair.getKey(),
-                                               edgePair.getValue()));
-    return this;
+    return addEdge(vertexId, edgePair.getKey(), edgePair.getValue());
   }
 
   /**
@@ -174,12 +173,16 @@ public class TestGraph<I extends WritableComparable,
    */
   public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) {
     if (!vertices.containsKey(vertexId)) {
-      Vertex<I, V, E> v = conf.createVertex();
-      v.initialize(vertexId, conf.createVertexValue());
-      vertices.put(vertexId, v);
+      if (createSourceVertexCallback.shouldCreateSourceVertex(vertexId)) {
+        Vertex<I, V, E> v = conf.createVertex();
+        v.initialize(vertexId, conf.createVertexValue());
+        vertices.put(vertexId, v);
+      }
+    }
+    Vertex<I, V, E> v = vertices.get(vertexId);
+    if (v != null) {
+      v.addEdge(EdgeFactory.create(toVertex, edgeValue));
     }
-    vertices.get(vertexId)
-      .addEdge(EdgeFactory.create(toVertex, edgeValue));
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java 
b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
index 039e975..35c4390 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
@@ -20,13 +20,16 @@ package org.apache.giraph.io;
 
 import com.google.common.collect.Maps;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
 import org.apache.giraph.utils.ComputationCountEdges;
 import org.apache.giraph.utils.IntIntNullNoOpComputation;
 import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
 import org.junit.Test;
 
 import java.util.Map;
@@ -133,6 +136,68 @@ public class TestCreateSourceVertex {
     assertEquals(1, (int) values.get(7));
   }
 
+  @Test
+  public void testCustomCreateSourceVertex() throws Exception {
+    String [] vertices = new String[] {
+        "1 0",
+        "2 0",
+        "3 0",
+        "4 0",
+    };
+    String [] edges = new String[] {
+        "1 2",
+        "1 5",
+        "2 4",
+        "2 1",
+        "3 4",
+        "4 1",
+        "4 5",
+        "6 2",
+        "7 8",
+        "4 8",
+    };
+
+    GiraphConfiguration conf = getConf();
+    GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK.set(conf,
+        CreateEvenSourceVerticesCallback.class);
+
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that only vertices from vertex input are present in output graph
+    assertEquals(5, values.size());
+    // Check that the ids of vertices in output graph exactly match vertex 
input
+    assertTrue(values.containsKey(1));
+    assertTrue(values.containsKey(2));
+    assertTrue(values.containsKey(3));
+    assertTrue(values.containsKey(4));
+    assertTrue(values.containsKey(6));
+
+    conf.setComputationClass(ComputationCountEdges.class);
+    results = InternalVertexRunner.run(conf, vertices, edges);
+    values = parseResults(results);
+
+    // Check the number of edges of each vertex
+    assertEquals(2, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(1, (int) values.get(3));
+    assertEquals(3, (int) values.get(4));
+    assertEquals(1, (int) values.get(6));
+  }
+
+  /**
+   * Only allows to create vertices with even ids.
+   */
+  public static class CreateEvenSourceVerticesCallback extends
+      DefaultCreateSourceVertexCallback<IntWritable> {
+
+    @Override
+    public boolean shouldCreateSourceVertex(IntWritable vertexId) {
+      return vertexId.get()  % 2 == 0;
+    }
+  }
+
+
   private GiraphConfiguration getConf() {
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(IntIntNullNoOpComputation.class);

Reply via email to