Repository: giraph
Updated Branches:
  refs/heads/trunk 4a188d8d1 -> 47da75182


Merge seeded and unseeded BFS into a single BFS implementation

Summary: This change creates a general Breadth First Search version which 
supports the default BFS where distances to one or more seeds are computed. 
Additionally, this new version also supports assigning vertices to closest 
seeds for the purpose of clustering the vertices. This change provides a 
BlockFactory which highlights this functionality in addition to test cases.

Test Plan: Test cases for the new functionality added.

Reviewers: spupyrev, ikabiljo

Reviewed By: ikabiljo

Subscribers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D47985


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/47da7518
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/47da7518
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/47da7518

Branch: refs/heads/trunk
Commit: 47da751824e9a58cd51fe6b77acdfba566a8df79
Parents: 4a188d8
Author: Mayank Pundir <mpun...@fb.com>
Authored: Fri Oct 9 13:18:37 2015 -0700
Committer: Igor Kabiljo <ikabi...@fb.com>
Committed: Fri Oct 9 13:18:37 2015 -0700

----------------------------------------------------------------------
 .../library/algo/BreadthFirstSearch.java        | 217 ++++++++++---------
 ...MultiSeedBreadthFirstSearchBlockFactory.java | 155 +++++++++++++
 .../MultiSeedBreadthFirstSearchVertexValue.java |  58 +++++
 .../library/algo/TestBreadthFirstSearch.java    |  23 +-
 .../algo/TestMultiSeedBreadthFirstSearch.java   | 127 +++++++++++
 .../block_app/library/algo/VertexLongPair.java  |  73 +++++++
 .../giraph/block_app/reducers/TopNReduce.java   |  85 ++++++++
 7 files changed, 627 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
index fe290fb..d85fc12 100644
--- 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
@@ -34,168 +34,181 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.function.Consumer;
 import org.apache.giraph.function.ObjectTransfer;
 import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.TripleFunction;
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.reducers.impl.SumReduce;
 import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Iterators;
 
+import java.util.Iterator;
+
 /**
  * Class for running breadth-first search on the graph.
- *
+ * <p>
  * Graph is expected to be symmetric before calling any of the methods here.
  */
 public class BreadthFirstSearch {
   private static final Logger LOG = Logger.getLogger(BreadthFirstSearch.class);
   private static final IntWritable NOT_REACHABLE_VERTEX_VALUE =
-      new IntWritable(-1);
+    new IntWritable(-1);
 
   private BreadthFirstSearch() {
   }
 
   /**
-   * Default block, which calculates connected components using the vertex's
-   * default edges.
+   * Default block for computing breadth-first search distances given functions
+   * isVertexInSeedSet, getDistance and setDistance. This BFS computation block
+   * computes only the shortest distance to seed vertices and does not compute
+   * the closest seed.
    */
-  public static <I extends WritableComparable, V extends Writable>
-  Block bfs(
+  public static <I extends WritableComparable, V extends Writable> Block bfs(
     SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet,
     SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
-    ConsumerWithVertex<I, V, Writable, IntWritable> setDistance
-  ) {
+    ConsumerWithVertex<I, V, Writable, IntWritable> setDistance) {
     ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
-    ObjectTransfer<Boolean> vertexUpdatedDistance = new ObjectTransfer<>();
+    ObjectTransfer<ByteWritable> vertexUpdatedDistance = new 
ObjectTransfer<>();
+
+    IntWritable reusableInt = new IntWritable();
+    ByteWritable emptyByteWritable = new ByteWritable();
+    SupplierFromVertex<I, V, Writable, ByteWritable>
+      initializeVertex = (vertex) -> {
+      if (isVertexInSeedSet.get(vertex)) {
+        reusableInt.set(0);
+        setDistance.apply(vertex, reusableInt);
+        return emptyByteWritable;
+      } else {
+        reusableInt.set(-1);
+        setDistance.apply(vertex, reusableInt);
+        return null;
+      }
+    };
+
+    IntWritable notReachableVertex = new IntWritable(-1);
+    TripleFunction<Vertex<I, V, Writable>, IntWritable, Iterator<ByteWritable>,
+      ByteWritable> traverseVertex = (vertex, distance, messageIter) -> {
+      if (getDistance.get(vertex).compareTo(notReachableVertex) == 0 ||
+        getDistance.get(vertex).compareTo(distance) > 0) {
+        setDistance.apply(vertex, distance);
+        return emptyByteWritable;
+      } else {
+        return null;
+      }
+    };
 
+    Class<ByteWritable> messageClass = ByteWritable.class;
     return new SequenceBlock(
-      createInitializePiece(
-        vertexUpdatedDistance,
-        isVertexInSeedSet,
-        getDistance,
-        setDistance,
-        VertexSuppliers.vertexEdgesSupplier()
-      ),
+      createInitializePiece(vertexUpdatedDistance, initializeVertex),
       RepeatUntilBlock.unlimited(
-        createPropagateConnectedComponentsPiece(
-          vertexUpdatedDistance,
-          vertexUpdatedDistance,
-          converged,
-          getDistance,
-          setDistance,
-          VertexSuppliers.vertexEdgesSupplier()
-        ),
-        converged
-      )
-    );
+        createPropagateConnectedComponentsPiece(messageClass,
+          vertexUpdatedDistance, vertexUpdatedDistance, converged,
+          traverseVertex, VertexSuppliers.vertexEdgesSupplier()), converged));
   }
 
   /**
-   * Initialize vertex values for connected components calculation
+   * Default block for computing breadth-first search distances given functions
+   * initializeVertex and traverseVertex. This BFS computation block computes
+   * both the shortest distance to seed vertices as well as the closest seed
+   * for all vertices.
    */
-  private static <I extends WritableComparable, V extends Writable>
+  public static
+  <I extends WritableComparable, V extends Writable, M extends Writable>
+  Block bfs(Class<M> messageType,
+    SupplierFromVertex<I, V, Writable, M> initializeVertex,
+    TripleFunction<Vertex<I, V, Writable>, IntWritable, Iterator<M>, M>
+      traverseVertex) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+    ObjectTransfer<M> vertexUpdatedDistance = new ObjectTransfer<>();
+
+    return new SequenceBlock(
+      createInitializePiece(vertexUpdatedDistance, initializeVertex),
+      RepeatUntilBlock.unlimited(
+        createPropagateConnectedComponentsPiece(messageType,
+          vertexUpdatedDistance, vertexUpdatedDistance, converged,
+          traverseVertex, VertexSuppliers.vertexEdgesSupplier()), converged));
+  }
+
+  /**
+   * Initialize vertex values for BFS computation.
+   */
+  private static
+  <I extends WritableComparable, V extends Writable, M extends Writable>
   Piece<I, V, Writable, NoMessage, Object> createInitializePiece(
-    Consumer<Boolean> vertexUpdatedDistance,
-    SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet,
-    SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
-    ConsumerWithVertex<I, V, Writable, IntWritable> setDistance,
-    SupplierFromVertex<I, V, Writable, ? extends Iterable<? extends Edge<I, 
?>>>
-      edgeSupplier
-  ) {
-    IntWritable zero = new IntWritable(0);
-    return Pieces.forAllVerticesOnReceive(
-      "InitializeBFS",
-      (vertex) -> {
-        if (isVertexInSeedSet.get(vertex)) {
-          setDistance.apply(vertex, zero);
-          vertexUpdatedDistance.apply(true);
-        } else {
-          setDistance.apply(vertex, NOT_REACHABLE_VERTEX_VALUE);
-          vertexUpdatedDistance.apply(false);
-        }
-      }
-    );
+    Consumer<M> vertexUpdatedDistance,
+    SupplierFromVertex<I, V, Writable, M> initializeVertex) {
+    return Pieces.forAllVerticesOnReceive("InitializeBFS", (vertex) -> {
+      vertexUpdatedDistance.apply(initializeVertex.get(vertex));
+    });
   }
 
   /**
-   * Propagate connected components to neighbor pieces
+   * Propagate shortest distance to to neighbor pieces using connected
+   * components piece.
    */
-  private static <I extends WritableComparable, V extends Writable>
+  private static
+  <I extends WritableComparable, V extends Writable, M extends Writable>
   Block createPropagateConnectedComponentsPiece(
-      Supplier<Boolean> vertexToPropagate,
-      Consumer<Boolean> vertexUpdatedDistance,
-      Consumer<Boolean> converged,
-      SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
-      ConsumerWithVertex<I, V, Writable, IntWritable> setDistance,
-      SupplierFromVertex<I, V, Writable, ? extends Iterable<?
-        extends Edge<I, ?>>> edgeSupplier) {
-    return new Piece<I, V, Writable, IntWritable, Object>() {
+    Class<M> messageClass, Supplier<M> vertexToPropagate,
+    Consumer<M> vertexUpdatedDistance, Consumer<Boolean> converged,
+    TripleFunction<Vertex<I, V, Writable>, IntWritable, Iterator<M>, M>
+      traverseVertex,
+    SupplierFromVertex<I, V, Writable, ? extends Iterable<?
+      extends Edge<I, ?>>> edgeSupplier) {
+    return new Piece<I, V, Writable, M, Object>() {
+      private IntWritable globalDistance = new IntWritable(0);
       private ReducerHandle<IntWritable, IntWritable> propagatedAggregator;
 
-      @Override
-      public void registerReducers(
-          CreateReducersApi reduceApi, Object executionStage) {
+      @Override public void registerReducers(CreateReducersApi reduceApi,
+        Object executionStage) {
         propagatedAggregator = reduceApi.createLocalReducer(SumReduce.INT);
       }
 
-      @Override
-      public VertexSender<I, V, Writable> getVertexSender(
-        BlockWorkerSendApi<I, V, Writable, IntWritable> workerApi,
-        Object executionStage
-      ) {
+      @Override public VertexSender<I, V, Writable> getVertexSender(
+        BlockWorkerSendApi<I, V, Writable, M> workerApi,
+        Object executionStage) {
         return (vertex) -> {
-          if (vertexToPropagate.get()) {
-            workerApi.sendMessageToMultipleEdges(
-              Iterators.transform(
-                edgeSupplier.get(vertex).iterator(),
-                Edge::getTargetVertexId
-              ),
-              getDistance.get(vertex)
-            );
+          M messageToSend = vertexToPropagate.get();
+          if (messageToSend != null) {
+            workerApi.sendMessageToMultipleEdges(Iterators
+              .transform(edgeSupplier.get(vertex).iterator(),
+                Edge::getTargetVertexId), messageToSend);
             reduceInt(propagatedAggregator, 1);
           }
         };
       }
 
-      @Override
-      public void masterCompute(BlockMasterApi master, Object executionStage) {
-        converged.apply(
-            propagatedAggregator.getReducedValue(master).get() == 0);
+      @Override public void masterCompute(BlockMasterApi master,
+        Object executionStage) {
+        converged
+          .apply(propagatedAggregator.getReducedValue(master).get() == 0);
         LOG.info("BFS: " + propagatedAggregator.getReducedValue(master).get() +
-                 " many vertices sent in this iteration");
+          " many vertices sent in this iteration");
+        globalDistance.set(globalDistance.get() + 1);
       }
 
-      @Override
-      public VertexReceiver<I, V, Writable, IntWritable> getVertexReceiver(
-        BlockWorkerReceiveApi<I> workerApi,
-        Object executionStage
-      ) {
-        IntWritable next = new IntWritable();
+      @Override public VertexReceiver<I, V, Writable, M> getVertexReceiver(
+        BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
         return (vertex, messages) -> {
-          vertexUpdatedDistance.apply(false);
-          for (IntWritable receivedValue : messages) {
-            IntWritable currentValue = getDistance.get(vertex);
-            next.set(receivedValue.get() + 1);
-            if (currentValue.compareTo(NOT_REACHABLE_VERTEX_VALUE) == 0 ||
-                currentValue.compareTo(next) > 0) {
-              setDistance.apply(vertex, next);
-              vertexUpdatedDistance.apply(true);
-            }
+          if (messages.iterator().hasNext()) {
+            vertexUpdatedDistance.apply(traverseVertex
+              .apply(vertex, globalDistance, messages.iterator()));
           }
         };
       }
 
-      @Override
-      public Class<IntWritable> getMessageClass() {
-        return IntWritable.class;
+      @Override public Class<M> getMessageClass() {
+        return messageClass;
       }
 
-      @Override
-      public String toString() {
-        return "PropagateConnectedComponentsPiece";
+      @Override public String toString() {
+        return "BreadthFirstSearchPiece";
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchBlockFactory.java
new file mode 100644
index 0000000..dcf1690
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchBlockFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.reducers.TopNReduce;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.function.TripleFunction;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.writable.kryo.TransientRandom;
+import org.apache.hadoop.io.*;
+
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Example Application of BFS calculation with multiple seeds
+ */
+public class MultiSeedBreadthFirstSearchBlockFactory
+  extends AbstractBlockFactory<Object> {
+  public static final IntConfOption RANDOM_SEED_COUNT =
+    new IntConfOption("multi_seed_bfs.random_seeds", 0,
+      "If using random seeds, the count of random seeds to be generated");
+  public static final StrConfOption SEED_LIST =
+    new StrConfOption("multi_seed_bfs.seed_list", "",
+      "List of comma separated IDs of the seed vertices");
+
+  private static
+  TripleFunction<Vertex<LongWritable,
+    MultiSeedBreadthFirstSearchVertexValue, Writable>, IntWritable,
+    Iterator<IntWritable>, IntWritable> traverseVertex() {
+    IntWritable notReachableVertex = new IntWritable(-1);
+    IntWritable vertexValue = new IntWritable();
+    IntWritable reservoirValue = new IntWritable();
+    TransientRandom random = new TransientRandom();
+    IntWritable reusableMessage = new IntWritable();
+    // Reservoir sampling to select the seed from the set of messages received
+    return (vertex, distance, messageIter) -> {
+      vertexValue.set(vertex.getValue().getDistance());
+      if (vertexValue.compareTo(notReachableVertex) == 0 ||
+        vertexValue.compareTo(distance) > 0) {
+        reservoirValue.set(messageIter.next().get());
+        int messageIndex = 1;
+        while (messageIter.hasNext()) {
+          if (random.nextInt(messageIndex + 1) < 1) {
+            reservoirValue.set(messageIter.next().get());
+          } else {
+            messageIter.next();
+          }
+          messageIndex++;
+        }
+        vertex.getValue().setSourceIndex(reservoirValue.get());
+        vertex.getValue().setDistance(distance.get());
+        reusableMessage.set(vertex.getValue().getSourceIndex());
+        return reusableMessage;
+      } else {
+        return null;
+      }
+    };
+  }
+
+  @Override public Block createBlock(GiraphConfiguration conf) {
+    Long2IntOpenHashMap seeds = new Long2IntOpenHashMap();
+    Piece pickSeedVertices = null;
+    if (RANDOM_SEED_COUNT.get(conf) > 0) {
+      TransientRandom random = new TransientRandom();
+      pickSeedVertices = Pieces.reduce("SeedSelection",
+        new TopNReduce<VertexLongPair<LongWritable>>(
+          RANDOM_SEED_COUNT.get(conf)), (vertex) -> {
+          return new VertexLongPair<LongWritable>((LongWritable) 
vertex.getId(),
+            random.get().nextLong());
+        }, (result) -> {
+          PriorityQueue<VertexLongPair<LongWritable>> queue = result.get();
+          int index = 0;
+          while (!queue.isEmpty()) {
+            VertexLongPair<LongWritable> nextPair = queue.poll();
+            seeds.put(nextPair.getVertex().get(), index++);
+          }
+        });
+    } else {
+      String[] sepStr = SEED_LIST.get(conf).split(", ");
+      for (int ii = 0; ii < sepStr.length; ++ii) {
+        seeds.put(Long.parseLong(sepStr[ii]), ii);
+      }
+    }
+
+    IntWritable reusableMessage = new IntWritable();
+    SupplierFromVertex<LongWritable, MultiSeedBreadthFirstSearchVertexValue,
+          Writable, IntWritable>
+      initializeVertex = (vertex) -> {
+      if (seeds.containsKey(vertex.getId().get())) {
+        vertex.getValue().setDistance(0);
+        vertex.getValue().setSourceIndex(seeds.get(vertex.getId().get()));
+        reusableMessage.set(vertex.getValue().getSourceIndex());
+        return reusableMessage;
+      } else {
+        vertex.getValue().setDistance(-1);
+        vertex.getValue().setSourceIndex(-1);
+        return null;
+      }
+    };
+
+    if (RANDOM_SEED_COUNT.get(conf) > 0) {
+      return new SequenceBlock(pickSeedVertices, BreadthFirstSearch
+        .bfs(IntWritable.class, initializeVertex, traverseVertex()));
+    } else {
+      return BreadthFirstSearch
+        .bfs(IntWritable.class, initializeVertex, traverseVertex());
+    }
+  }
+
+  @Override public Object createExecutionStage(GiraphConfiguration conf) {
+    return new Object();
+  }
+
+  @Override protected Class<LongWritable> getVertexIDClass(
+    GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override protected Class<MultiSeedBreadthFirstSearchVertexValue>
+  getVertexValueClass(
+    GiraphConfiguration conf) {
+    return MultiSeedBreadthFirstSearchVertexValue.class;
+  }
+
+  @Override protected Class<NullWritable> getEdgeValueClass(
+    GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchVertexValue.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchVertexValue.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchVertexValue.java
new file mode 100644
index 0000000..81d2ec5
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/MultiSeedBreadthFirstSearchVertexValue.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Vertex-value class for MultiSourceBreadthFirstSearchBlockFactory.
+ */
+public class MultiSeedBreadthFirstSearchVertexValue implements Writable {
+  private int distance;
+  private int sourceIndex;
+
+  public int getSourceIndex() {
+    return this.sourceIndex;
+  }
+
+  public void setSourceIndex(int sourceID) {
+    this.sourceIndex = sourceID;
+  }
+
+  public int getDistance() {
+    return this.distance;
+  }
+
+  public void setDistance(int distance) {
+    this.distance = distance;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    out.writeInt(distance);
+    out.writeInt(sourceIndex);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    distance = in.readInt();
+    sourceIndex = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
index c19fe27..5361251 100644
--- 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java
@@ -36,11 +36,13 @@ import org.junit.Test;
 
 public class TestBreadthFirstSearch {
   private void run(
-    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, 
NullWritable> graphLoader,
+    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, 
NullWritable>
+      graphLoader,
     int[] expectedDistances,
     int[] seedVertices
   ) throws Exception {
-    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, 
NullWritable> valueLoader =
+    TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, 
NullWritable>
+      valueLoader =
       (graph) -> {
         List<Integer> seeds = Arrays.asList(ArrayUtils.toObject(seedVertices));
         for (int i = 0; i < graph.getVertexCount(); i++)
@@ -55,7 +57,8 @@ public class TestBreadthFirstSearch {
         }
       },
       (conf) -> {
-        BlockUtils.setBlockFactoryClass(conf, 
BreadthFirstSearchBlockFactory.class);
+        BlockUtils.setBlockFactoryClass(conf,
+          BreadthFirstSearchBlockFactory.class);
       }
     );
   }
@@ -97,21 +100,23 @@ public class TestBreadthFirstSearch {
 
   @Test
   public void testMultipleComponentGraphCloseSeeds() throws Exception {
-    int[] expected = {2, 1, 0, 1, 2, 3, 3, 3, 2, 2, 2, 2, 1, 0, 2, -1, -1, -1, 
-1, -1, -1};
+    int[] expected =
+      {2, 1, 0, 1, 2, 3, 3, 3, 2, 2, 2, 2, 1, 0, 2, -1, -1, -1, -1, -1, -1};
     int[] seeds = {13, 2};
     run(new Graph2Init(), expected, seeds);
   }
 
   @Test
   public void testMultipleComponentGraphFarSeeds() throws Exception {
-    int[] expected = {3, 2, 3, 2, 1, 0, 1, 2, 1, 2, 3, 3, 2, 3, 3, 3, 2, 1, 0, 
1, -1};
+    int[] expected =
+      {3, 2, 3, 2, 1, 0, 1, 2, 1, 2, 3, 3, 2, 3, 3, 3, 2, 1, 0, 1, -1};
     int[] seeds = {5, 18};
     run(new Graph2Init(), expected, seeds);
   }
 
 
-  public class Graph1Init<I extends WritableComparable, V extends Writable, E 
extends Writable>
-      implements TestGraphModifier<I, V, E> {
+  public class Graph1Init<I extends WritableComparable, V extends Writable,
+    E extends Writable> implements TestGraphModifier<I, V, E> {
 
     @Override
     public void modifyGraph(NumericTestGraph<I, V, E> graph) {
@@ -134,8 +139,8 @@ public class TestBreadthFirstSearch {
     }
   }
 
-  public class Graph2Init<I extends WritableComparable, V extends Writable, E 
extends Writable>
-      implements TestGraphModifier<I, V, E> {
+  public class Graph2Init<I extends WritableComparable, V extends Writable,
+    E extends Writable> implements TestGraphModifier<I, V, E> {
 
     @Override
     public void modifyGraph(NumericTestGraph<I, V, E> graph) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestMultiSeedBreadthFirstSearch.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestMultiSeedBreadthFirstSearch.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestMultiSeedBreadthFirstSearch.java
new file mode 100644
index 0000000..c3aae07
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestMultiSeedBreadthFirstSearch.java
@@ -0,0 +1,127 @@
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMultiSeedBreadthFirstSearch {
+  private void run(
+    TestGraphModifier<LongWritable, MultiSeedBreadthFirstSearchVertexValue,
+      NullWritable> graphLoader,
+    int[] expectedDistances,
+    String seedVertices
+  ) throws Exception {
+    TestGraphUtils.runTest(graphLoader, (graph) -> {
+      for (int i = 0; i < expectedDistances.length; i++) {
+        assertEquals(expectedDistances[i],
+          graph.getVertex(i).getValue().getDistance());
+      }
+    }, (conf) -> {
+      MultiSeedBreadthFirstSearchBlockFactory.SEED_LIST.set(conf, 
seedVertices);
+      BlockUtils.setBlockFactoryClass(conf,
+        MultiSeedBreadthFirstSearchBlockFactory.class);
+    });
+  }
+
+  @Test
+  public void testSmall1TwoSeeds() throws Exception {
+    int[] expected = {0, 1, 1, 1, 0, 1, -1};
+    String seeds = "0, 4";
+    run(new Small1GraphInit<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmallGraphTwoSeeds() throws Exception {
+    int[] expected = {0, 1, 2, 2, 2, 2, 3, 4, 5, 5, 5, 1, 2, 2, 2, 0};
+    String seeds = "0, 15";
+    run(new Graph1Init<>(), expected, seeds);
+  }
+
+  @Test
+  public void testSmallGraphTwoCloseSeeds() throws Exception {
+    int[] expected = {1, 0, 1, 0, 1, 1, 1, 2, 3, 3, 3, 2, 3, 3, 3, 3};
+    String seeds = "1, 3";
+    run(new Graph1Init<>(), expected, seeds);
+  }
+
+  @Test
+  public void testMultipleComponentGraphCloseSeeds() throws Exception {
+    int[] expected =
+      {2, 1, 0, 1, 2, 3, 3, 3, 2, 2, 2, 2, 1, 0, 2, -1, -1, -1, -1, -1, -1};
+    String seeds = "13, 2";
+    run(new Graph2Init(), expected, seeds);
+  }
+
+  @Test
+  public void testMultipleComponentGraphFarSeeds() throws Exception {
+    int[] expected =
+      {3, 2, 3, 2, 1, 0, 1, 2, 1, 2, 3, 3, 2, 3, 3, 3, 2, 1, 0, 1, -1};
+    String seeds = "5, 18";
+    run(new Graph2Init(), expected, seeds);
+  }
+
+
+  public class Graph1Init<I extends WritableComparable, V extends Writable,
+    E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+    @Override
+    public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+      graph.addVertex(0, (Number) null, null, 1);
+      graph.addVertex(1, (Number) null, null, 0,2,3,4,5);
+      graph.addVertex(2, (Number) null, null, 1,3,4,5);
+      graph.addVertex(3, (Number) null, null, 1,2,4,5,6);
+      graph.addVertex(4, (Number) null, null, 1,2,3,5);
+      graph.addVertex(5, (Number) null, null, 1,2,3,4,11);
+      graph.addVertex(6, (Number) null, null, 3,7);
+      graph.addVertex(7, (Number) null, null, 6,8,9,10);
+      graph.addVertex(8, (Number) null, null, 7,9,10);
+      graph.addVertex(9, (Number) null, null, 7,8,10);
+      graph.addVertex(10, (Number) null, null, 7,8,9);
+      graph.addVertex(11, (Number) null, null, 5,12,13,14,15);
+      graph.addVertex(12, (Number) null, null, 11);
+      graph.addVertex(13, (Number) null, null, 11);
+      graph.addVertex(14, (Number) null, null, 11);
+      graph.addVertex(15, (Number) null, null, 11);
+    }
+  }
+
+  public class Graph2Init<I extends WritableComparable, V extends Writable,
+    E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+    @Override
+    public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+      graph.addVertex(0, (Number) null, null, 1);
+      graph.addVertex(1, (Number) null, null, 0,2,3,4);
+      graph.addVertex(2, (Number) null, null, 1,3);
+      graph.addVertex(3, (Number) null, null, 2,4,9,10,11);
+      graph.addVertex(4, (Number) null, null, 1,3,5,6,7);
+      graph.addVertex(5, (Number) null, null, 4,6,5,8);
+      graph.addVertex(6, (Number) null, null, 4,5,7);
+      graph.addVertex(7, (Number) null, null, 4,5,6);
+      graph.addVertex(8, (Number) null, null, 5,9,12);
+      graph.addVertex(9, (Number) null, null, 3,8,10,11,12);
+      graph.addVertex(10, (Number) null, null, 3,9,11);
+      graph.addVertex(11, (Number) null, null, 3,9,10);
+      graph.addVertex(12, (Number) null, null, 8,9,13,14);
+      graph.addVertex(13, (Number) null, null, 12);
+      graph.addVertex(14, (Number) null, null, 12);
+      graph.addVertex(15, (Number) null, null, 16);
+      graph.addVertex(16, (Number) null, null, 15,17,19);
+      graph.addVertex(17, (Number) null, null, 16,18);
+      graph.addVertex(18, (Number) null, null, 17,19);
+      graph.addVertex(19, (Number) null, null, 16,18);
+      graph.addVertex(20);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/VertexLongPair.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/VertexLongPair.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/VertexLongPair.java
new file mode 100644
index 0000000..f1723a7
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/VertexLongPair.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Pair of vertex and value. Used in MultiSeedBreadthFirstSearchBlockFactory
+ * for obtaining top N random vertices as seeds.
+ *
+ * @param <I> Vertex ID type
+ */
+public class VertexLongPair<I extends Writable>
+  implements Comparable<VertexLongPair<I>> {
+  private I vertex;
+  private long value;
+
+  public VertexLongPair(I vertex, long value) {
+    this.vertex = vertex;
+    this.value = value;
+  }
+
+  public I getVertex() {
+    return vertex;
+  }
+
+  public void setVertex(I vertex) {
+    this.vertex = vertex;
+  }
+
+  public long getValue() {
+    return value;
+  }
+
+  public void setValue(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public int compareTo(VertexLongPair<I> other) {
+    return Long.compare(this.value, other.getValue());
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (!(object instanceof VertexLongPair)) {
+      return false;
+    }
+    VertexLongPair<I> other = (VertexLongPair<I>) object;
+    return vertex == other.vertex;
+  }
+
+  @Override
+  public int hashCode() {
+    return vertex.hashCode();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/47da7518/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/TopNReduce.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/TopNReduce.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/TopNReduce.java
new file mode 100644
index 0000000..621c313
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/TopNReduce.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+/**
+ * Extracts top N largest elements
+ *
+ * @param <S> Single value type, objects passed on workers
+ */
+public class TopNReduce<S extends Comparable<S>>
+  extends KryoWrappedReduceOperation<S, PriorityQueue<S>> {
+  private int capacity;
+
+  public TopNReduce(int capacity) {
+    this.capacity = capacity;
+  }
+
+  public TopNReduce() { }
+
+  @Override
+  public PriorityQueue<S> createValue() {
+    return new PriorityQueue<S>();
+  }
+
+  @Override
+  public void reduce(PriorityQueue<S> heap, S value) {
+    if (capacity == 0) {
+      return;
+    }
+
+    if (heap.size() < capacity) {
+      heap.add(value);
+    } else {
+      S head = heap.peek();
+      if (head.compareTo(value) < 0) {
+        heap.poll();
+        heap.add(value);
+      }
+    }
+  }
+
+  @Override
+  public void reduceMerge(
+    PriorityQueue<S> reduceInto,
+    PriorityQueue<S> toReduce
+  ) {
+    for (S element : toReduce) {
+      reduce(reduceInto, element);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(capacity);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    capacity = in.readInt();
+  }
+}

Reply via email to