Repository: giraph
Updated Branches:
  refs/heads/trunk 1c7552b1a -> 0b1962253


GIRAPH-1060: Add combiner to connected components

Summary: Connected components should use combiner to make it more efficient and 
require less memory. A few additional cleanups while at it.

Test Plan: mvn clean verify

Differential Revision: https://reviews.facebook.net/D57879


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0b196225
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0b196225
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0b196225

Branch: refs/heads/trunk
Commit: 0b1962253c8cd3e2194b1b0015c4352277c1ce0a
Parents: 1c7552b
Author: Maja Kabiljo <[email protected]>
Authored: Mon May 9 11:07:48 2016 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Mon May 9 11:22:35 2016 -0700

----------------------------------------------------------------------
 .../UndirectedConnectedComponents.java          | 60 ++++++++++----------
 .../giraph/combiner/MinMessageCombiner.java     | 55 ++++++++++++++++++
 .../apache/giraph/types/ops/ByteTypeOps.java    |  5 ++
 .../apache/giraph/types/ops/DoubleTypeOps.java  |  5 ++
 .../apache/giraph/types/ops/FloatTypeOps.java   |  5 ++
 .../org/apache/giraph/types/ops/IntTypeOps.java |  5 ++
 .../apache/giraph/types/ops/LongTypeOps.java    |  5 ++
 .../apache/giraph/types/ops/NumericTypeOps.java | 10 ++++
 giraph-core/templates/TypeTypeOps.java          |  5 ++
 9 files changed, 126 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
index 2610436..fb04fa8 100644
--- 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
@@ -34,6 +34,7 @@ import org.apache.giraph.block_app.library.SendMessageChain;
 import org.apache.giraph.block_app.library.VertexSuppliers;
 import org.apache.giraph.block_app.reducers.map.BasicMapReduce;
 import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.MinMessageCombiner;
 import org.apache.giraph.combiner.SumMessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
@@ -47,6 +48,7 @@ import org.apache.giraph.reducers.impl.MaxPairReducer;
 import org.apache.giraph.reducers.impl.SumReduce;
 import org.apache.giraph.types.NoMessage;
 import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
 import org.apache.giraph.types.ops.TypeOps;
 import org.apache.giraph.writable.tuple.LongLongWritable;
 import org.apache.giraph.writable.tuple.PairWritable;
@@ -55,7 +57,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 
 /**
@@ -98,6 +99,7 @@ public class UndirectedConnectedComponents {
       <I extends WritableComparable, V extends Writable>
       extends Piece<I, V, Writable, I, Object> {
     private final TypeOps<I> idTypeOps;
+    private final MinMessageCombiner<I, I> minMessageCombiner;
     private final Supplier<Boolean> vertexToPropagate;
     private final Consumer<Boolean> vertexUpdatedComponent;
     private final Consumer<Boolean> converged;
@@ -118,6 +120,8 @@ public class UndirectedConnectedComponents {
         SupplierFromVertex<I, V, Writable,
           ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) {
       this.idTypeOps = idTypeOps;
+      this.minMessageCombiner = idTypeOps instanceof NumericTypeOps ?
+          new MinMessageCombiner<>((NumericTypeOps<I>) idTypeOps) : null;
       this.vertexToPropagate = vertexToPropagate;
       this.vertexUpdatedComponent = vertexUpdatedComponent;
       this.converged = converged;
@@ -137,22 +141,14 @@ public class UndirectedConnectedComponents {
         final BlockWorkerSendApi<I, V, Writable, I> workerApi,
         Object executionStage) {
       final LongWritable one = new LongWritable(1);
-      return new InnerVertexSender() {
-        @Override
-        public void vertexSend(Vertex<I, V, Writable> vertex) {
-          if (vertexToPropagate.get()) {
-            workerApi.sendMessageToMultipleEdges(
-                Iterators.transform(
-                    edgeSupplier.get(vertex).iterator(),
-                    new Function<Edge<I, ?>, I>() {
-                      @Override
-                      public I apply(Edge<I, ?> edge) {
-                        return edge.getTargetVertexId();
-                      }
-                    }),
-                getComponent.get(vertex));
-            propagatedAggregator.reduce(one);
-          }
+      return vertex -> {
+        if (vertexToPropagate.get()) {
+          workerApi.sendMessageToMultipleEdges(
+              Iterators.transform(
+                  edgeSupplier.get(vertex).iterator(),
+                  edge -> edge.getTargetVertexId()),
+              getComponent.get(vertex));
+          propagatedAggregator.reduce(one);
         }
       };
     }
@@ -169,26 +165,21 @@ public class UndirectedConnectedComponents {
     public VertexReceiver<I, V, Writable, I> getVertexReceiver(
         BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
       return new InnerVertexReceiver() {
-        private final I received = idTypeOps.create();
+        private final I newComponent = idTypeOps.create();
 
         @Override
         public void vertexReceive(Vertex<I, V, Writable> vertex,
             Iterable<I> messages) {
-          boolean first = true;
+          idTypeOps.set(newComponent, getComponent.get(vertex));
           for (I value : messages) {
-            if (first) {
-              idTypeOps.set(received, value);
-              first = false;
-            } else {
-              if (received.compareTo(value) > 0) {
-                idTypeOps.set(received, value);
-              }
+            if (newComponent.compareTo(value) > 0) {
+              idTypeOps.set(newComponent, value);
             }
           }
 
           I cur = getComponent.get(vertex);
-          if (!first && cur.compareTo(received) > 0) {
-            setComponent.apply(vertex, received);
+          if (cur.compareTo(newComponent) > 0) {
+            setComponent.apply(vertex, newComponent);
             vertexUpdatedComponent.apply(true);
           } else {
             vertexUpdatedComponent.apply(false);
@@ -198,8 +189,19 @@ public class UndirectedConnectedComponents {
     }
 
     @Override
+    public MessageCombiner<? super I, I> getMessageCombiner(
+        ImmutableClassesGiraphConfiguration conf) {
+      return minMessageCombiner;
+    }
+
+    @Override
     public Class<I> getMessageClass() {
-      return idTypeOps.getTypeClass();
+      return minMessageCombiner == null ? idTypeOps.getTypeClass() : null;
+    }
+
+    @Override
+    protected boolean allowOneMessageToManyIdsEncoding() {
+      return true;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java
new file mode 100644
index 0000000..3454366
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.combiner;
+
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Keeps only the message with minimum value.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message
+ */
+public class MinMessageCombiner<I extends WritableComparable,
+    M extends Writable> implements MessageCombiner<I, M> {
+  /** Numeric type ops for the value to combine */
+  private final NumericTypeOps<M> numTypeOps;
+
+  /**
+   * Combiner
+   *
+   * @param numTypeOps Type ops to use
+   */
+  public MinMessageCombiner(NumericTypeOps<M> numTypeOps) {
+    this.numTypeOps = numTypeOps;
+  }
+
+  @Override
+  public void combine(I vertexId, M originalMessage, M messageToCombine) {
+    if (numTypeOps.compare(originalMessage, messageToCombine) > 0) {
+      numTypeOps.set(originalMessage, messageToCombine);
+    }
+  }
+
+  @Override
+  public M createInitialMessage() {
+    return this.numTypeOps.createMaxPositiveValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
index 6499c2b..71a30da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java
@@ -101,4 +101,9 @@ public enum ByteTypeOps implements
   public void negate(ByteWritable value) {
     value.set((byte) (-value.get()));
   }
+
+  @Override
+  public int compare(ByteWritable value1, ByteWritable value2) {
+    return Byte.compare(value1.get(), value2.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
index f549208..89e50b1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
@@ -101,4 +101,9 @@ public enum DoubleTypeOps implements
   public void negate(DoubleWritable value) {
     value.set(-value.get());
   }
+
+  @Override
+  public int compare(DoubleWritable value1, DoubleWritable value2) {
+    return Double.compare(value1.get(), value2.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
index cf970c0..64279ce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
@@ -101,4 +101,9 @@ public enum FloatTypeOps implements
   public void negate(FloatWritable value) {
     value.set(-value.get());
   }
+
+  @Override
+  public int compare(FloatWritable value1, FloatWritable value2) {
+    return Float.compare(value1.get(), value2.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
index 943637c..a03cf94 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -126,4 +126,9 @@ public enum IntTypeOps implements
   public void negate(IntWritable value) {
     value.set(-value.get());
   }
+
+  @Override
+  public int compare(IntWritable value1, IntWritable value2) {
+    return Integer.compare(value1.get(), value2.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
index 2e3c8e7..916d166 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -126,4 +126,9 @@ public enum LongTypeOps implements
   public void negate(LongWritable value) {
     value.set(-value.get());
   }
+
+  @Override
+  public int compare(LongWritable value1, LongWritable value2) {
+    return Long.compare(value1.get(), value2.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
index 6420a1b..a9786a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
@@ -71,4 +71,14 @@ public interface NumericTypeOps<T> extends TypeOps<T> {
    * @param value Value to negate
    */
   void negate(T value);
+
+  /**
+   * Compare two values
+   *
+   * @param value1 First value
+   * @param value2 Second value
+   * @return 0 if values are equal, negative value if value1<value2 and
+   *         positive value if value1>value2
+   */
+  int compare(T value1, T value2);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/templates/TypeTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/templates/TypeTypeOps.java 
b/giraph-core/templates/TypeTypeOps.java
index 8d92e8a..bba752d 100644
--- a/giraph-core/templates/TypeTypeOps.java
+++ b/giraph-core/templates/TypeTypeOps.java
@@ -143,5 +143,10 @@ public enum ${type.camel}TypeOps implements
   public void negate(${type.camel}Writable value) {
     value.set(<@cast_if_needed_e expr="-value.get()"/>);
   }
+
+  @Override
+  public int compare(${type.camel}Writable value1, ${type.camel}Writable 
value2) {
+    return ${type.boxed}.compare(value1.get(), value2.get());
+  }
 </#if>
 }

Reply via email to