Author: edwardyoon
Date: Thu Jan 23 01:51:44 2014
New Revision: 1560570

URL: http://svn.apache.org/r1560570
Log:
HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)

Added:
    
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
   (with props)
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 23 01:51:44 2014
@@ -14,6 +14,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
    HAMA-845: The size() of Spilling Queue returns always numMessagesWritten 
(edwardyoon)
    HAMA-834: Fix KMeans example (Martin Illecker)
    HAMA-831: Support for multi records with same vertexID (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java 
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Jan 
23 01:51:44 2014
@@ -59,7 +59,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, 
TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, 
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, 
COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, 
TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, 
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, 
TOTAL_MESSAGES_COMBINED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, 
TIME_IN_SYNC_MS
   }
 
   private final HamaConfiguration conf;
@@ -459,6 +459,12 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   public final void close() {
+    long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
+        .getCounter()
+        - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
+    this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
+        combinedMessages);
+
     // there are many catches, because we want to close always every component
     // even if the one before failed.
     if (in != null) {

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
 Thu Jan 23 01:51:44 2014
@@ -139,6 +139,8 @@ public abstract class AbstractMessageMan
    */
   @Override
   public final void clearOutgoingMessages() {
+    outgoingMessageManager.clear();
+
     if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
         && localQueue.size() > 0) {
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan 
23 01:51:44 2014
@@ -31,6 +31,7 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 
@@ -57,6 +58,9 @@ public class GraphJob extends BSPJob {
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
+    conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+        OutgoingVertexMessagesManager.class, OutgoingMessageManager.class);
+    
     this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java 
Thu Jan 23 01:51:44 2014
@@ -156,7 +156,8 @@ public final class GraphJobMessage imple
     return map;
   }
 
-  public Writable getVertexId() {
+  @SuppressWarnings("rawtypes")
+  public WritableComparable getVertexId() {
     return vertexId;
   }
 

Modified: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java 
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java 
Thu Jan 23 01:51:44 2014
@@ -34,7 +34,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
@@ -83,7 +82,6 @@ public final class GraphJobRunner<V exte
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
 
   private HamaConfiguration conf;
-  private Combiner<M> combiner;
   private Partitioner<V, M> partitioner;
 
   public static Class<?> VERTEX_CLASS;
@@ -234,7 +232,7 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
     VertexMessageIterable<V, M> iterable = null;
     Vertex<V, E, M> vertex = null;
-    
+
     // note that can't skip inactive vertices because we have to rewrite the
     // complete vertex file in each iteration
     while (iterator.hasNext(
@@ -255,12 +253,7 @@ public final class GraphJobRunner<V exte
         if (iterable == null) {
           vertex.compute(Collections.<M> emptyList());
         } else {
-          if (combiner != null) {
-            M combined = combiner.combine(iterable);
-            vertex.compute(Collections.singleton(combined));
-          } else {
-            vertex.compute(iterable);
-          }
+          vertex.compute(iterable);
           currentMessage = iterable.getOverflowMessage();
         }
         activeVertices++;
@@ -358,15 +351,6 @@ public final class GraphJobRunner<V exte
             conf.getClass("bsp.input.partitioner.class", 
HashPartitioner.class),
             conf);
 
-    if (!conf.getClass(MESSAGE_COMBINER_CLASS_KEY, Combiner.class).equals(
-        Combiner.class)) {
-      LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS_KEY));
-
-      combiner = (Combiner<M>) org.apache.hadoop.util.ReflectionUtils
-          .newInstance(conf.getClass("hama.vertex.message.combiner.class",
-              Combiner.class), conf);
-    }
-
     Class<?> outputWriter = conf.getClass(
         GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class);
     vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>) 
ReflectionUtils

Added: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1560570&view=auto
==============================================================================
--- 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
 (added)
+++ 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
 Thu Jan 23 01:51:44 2014
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingVertexMessagesManager<M extends Writable> implements
+    OutgoingMessageManager<GraphJobMessage> {
+  protected static final Log LOG = LogFactory
+      .getLog(OutgoingVertexMessagesManager.class);
+
+  private Combiner<Writable> combiner;
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new 
HashMap<String, InetSocketAddress>();
+  private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> 
outgoingBundles = new HashMap<InetSocketAddress, 
BSPMessageBundle<GraphJobMessage>>();
+
+  @SuppressWarnings("rawtypes")
+  private HashMap<InetSocketAddress, Map<WritableComparable, Writable>> 
vertexMessageMap = new HashMap<InetSocketAddress, Map<WritableComparable, 
Writable>>();
+  private List<Writable> tmp;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(Configuration conf) {
+    if (!conf.getClass(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY,
+        Combiner.class).equals(Combiner.class)) {
+      LOG.debug("Combiner class: "
+          + conf.get(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY));
+
+      combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
+          .newInstance(conf.getClass("hama.vertex.message.combiner.class",
+              Combiner.class), conf);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void addMessage(String peerName, GraphJobMessage msg) {
+    InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+    if (msg.isVertexMessage() && combiner != null) {
+      WritableComparable vertexID = msg.getVertexId();
+      Writable vertexValue = msg.getVertexValue();
+
+      if (!vertexMessageMap.containsKey(targetPeerAddress)) {
+        vertexMessageMap.put(targetPeerAddress,
+            new HashMap<WritableComparable, Writable>());
+      }
+
+      Map<WritableComparable, Writable> combinedMessage = vertexMessageMap
+          .get(targetPeerAddress);
+
+      if (combinedMessage.containsKey(vertexID)) {
+        tmp = new ArrayList<Writable>();
+        tmp.add(combinedMessage.get(vertexID));
+        tmp.add(vertexValue);
+
+        Iterable<Writable> iterable = new Iterable<Writable>() {
+          @Override
+          public Iterator<Writable> iterator() {
+            return tmp.iterator();
+          }
+        };
+
+        combinedMessage.put(vertexID, combiner.combine(iterable));
+      } else {
+        combinedMessage.put(vertexID, vertexValue);
+      }
+
+    } else {
+      outgoingBundles.get(targetPeerAddress).addMessage(msg);
+    }
+  }
+
+  private InetSocketAddress getSocketAddress(String peerName) {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    if (!outgoingBundles.containsKey(targetPeerAddress)) {
+      outgoingBundles.put(targetPeerAddress,
+          new BSPMessageBundle<GraphJobMessage>());
+    }
+    return targetPeerAddress;
+  }
+
+  @Override
+  public void clear() {
+    outgoingBundles.clear();
+    vertexMessageMap.clear();
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>> 
getBundleIterator() {
+    if (combiner != null) {
+      for (Map.Entry<InetSocketAddress, Map<WritableComparable, Writable>> e : 
vertexMessageMap
+          .entrySet()) {
+        for (Map.Entry<WritableComparable, Writable> v : e.getValue()
+            .entrySet()) {
+          outgoingBundles.get(e.getKey()).addMessage(
+              new GraphJobMessage(v.getKey(), v.getValue()));
+        }
+      }
+    }
+
+    vertexMessageMap.clear();
+    return outgoingBundles.entrySet().iterator();
+  }
+}

Propchange: 
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to