Author: edwardyoon
Date: Thu Jan 23 01:51:44 2014
New Revision: 1560570
URL: http://svn.apache.org/r1560570
Log:
HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(with props)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 23 01:51:44 2014
@@ -14,6 +14,7 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
+ HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
HAMA-845: The size() of Spilling Queue returns always numMessagesWritten
(edwardyoon)
HAMA-834: Fix KMeans example (Martin Illecker)
HAMA-831: Support for multi records with same vertexID (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Jan
23 01:51:44 2014
@@ -59,7 +59,7 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED,
TIME_IN_SYNC_MS
}
private final HamaConfiguration conf;
@@ -459,6 +459,12 @@ public final class BSPPeerImpl<K1, V1, K
}
public final void close() {
+ long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
+ .getCounter()
+ - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
+ this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
+ combinedMessages);
+
// there are many catches, because we want to close always every component
// even if the one before failed.
if (in != null) {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Thu Jan 23 01:51:44 2014
@@ -139,6 +139,8 @@ public abstract class AbstractMessageMan
*/
@Override
public final void clearOutgoingMessages() {
+ outgoingMessageManager.clear();
+
if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
&& localQueue.size() > 0) {
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan
23 01:51:44 2014
@@ -31,6 +31,7 @@ import org.apache.hama.bsp.HashPartition
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
@@ -57,6 +58,9 @@ public class GraphJob extends BSPJob {
public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
throws IOException {
super(conf);
+ conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+ OutgoingVertexMessagesManager.class, OutgoingMessageManager.class);
+
this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Thu Jan 23 01:51:44 2014
@@ -156,7 +156,8 @@ public final class GraphJobMessage imple
return map;
}
- public Writable getVertexId() {
+ @SuppressWarnings("rawtypes")
+ public WritableComparable getVertexId() {
return vertexId;
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Jan 23 01:51:44 2014
@@ -34,7 +34,6 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
@@ -83,7 +82,6 @@ public final class GraphJobRunner<V exte
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
private HamaConfiguration conf;
- private Combiner<M> combiner;
private Partitioner<V, M> partitioner;
public static Class<?> VERTEX_CLASS;
@@ -234,7 +232,7 @@ public final class GraphJobRunner<V exte
IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
VertexMessageIterable<V, M> iterable = null;
Vertex<V, E, M> vertex = null;
-
+
// note that can't skip inactive vertices because we have to rewrite the
// complete vertex file in each iteration
while (iterator.hasNext(
@@ -255,12 +253,7 @@ public final class GraphJobRunner<V exte
if (iterable == null) {
vertex.compute(Collections.<M> emptyList());
} else {
- if (combiner != null) {
- M combined = combiner.combine(iterable);
- vertex.compute(Collections.singleton(combined));
- } else {
- vertex.compute(iterable);
- }
+ vertex.compute(iterable);
currentMessage = iterable.getOverflowMessage();
}
activeVertices++;
@@ -358,15 +351,6 @@ public final class GraphJobRunner<V exte
conf.getClass("bsp.input.partitioner.class",
HashPartitioner.class),
conf);
- if (!conf.getClass(MESSAGE_COMBINER_CLASS_KEY, Combiner.class).equals(
- Combiner.class)) {
- LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS_KEY));
-
- combiner = (Combiner<M>) org.apache.hadoop.util.ReflectionUtils
- .newInstance(conf.getClass("hama.vertex.message.combiner.class",
- Combiner.class), conf);
- }
-
Class<?> outputWriter = conf.getClass(
GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class);
vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>)
ReflectionUtils
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1560570&view=auto
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(added)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Thu Jan 23 01:51:44 2014
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingVertexMessagesManager<M extends Writable> implements
+ OutgoingMessageManager<GraphJobMessage> {
+ protected static final Log LOG = LogFactory
+ .getLog(OutgoingVertexMessagesManager.class);
+
+ private Combiner<Writable> combiner;
+ private final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
+ private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>
outgoingBundles = new HashMap<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>();
+
+ @SuppressWarnings("rawtypes")
+ private HashMap<InetSocketAddress, Map<WritableComparable, Writable>>
vertexMessageMap = new HashMap<InetSocketAddress, Map<WritableComparable,
Writable>>();
+ private List<Writable> tmp;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(Configuration conf) {
+ if (!conf.getClass(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY,
+ Combiner.class).equals(Combiner.class)) {
+ LOG.debug("Combiner class: "
+ + conf.get(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY));
+
+ combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
+ .newInstance(conf.getClass("hama.vertex.message.combiner.class",
+ Combiner.class), conf);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void addMessage(String peerName, GraphJobMessage msg) {
+ InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+ if (msg.isVertexMessage() && combiner != null) {
+ WritableComparable vertexID = msg.getVertexId();
+ Writable vertexValue = msg.getVertexValue();
+
+ if (!vertexMessageMap.containsKey(targetPeerAddress)) {
+ vertexMessageMap.put(targetPeerAddress,
+ new HashMap<WritableComparable, Writable>());
+ }
+
+ Map<WritableComparable, Writable> combinedMessage = vertexMessageMap
+ .get(targetPeerAddress);
+
+ if (combinedMessage.containsKey(vertexID)) {
+ tmp = new ArrayList<Writable>();
+ tmp.add(combinedMessage.get(vertexID));
+ tmp.add(vertexValue);
+
+ Iterable<Writable> iterable = new Iterable<Writable>() {
+ @Override
+ public Iterator<Writable> iterator() {
+ return tmp.iterator();
+ }
+ };
+
+ combinedMessage.put(vertexID, combiner.combine(iterable));
+ } else {
+ combinedMessage.put(vertexID, vertexValue);
+ }
+
+ } else {
+ outgoingBundles.get(targetPeerAddress).addMessage(msg);
+ }
+ }
+
+ private InetSocketAddress getSocketAddress(String peerName) {
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+
+ if (!outgoingBundles.containsKey(targetPeerAddress)) {
+ outgoingBundles.put(targetPeerAddress,
+ new BSPMessageBundle<GraphJobMessage>());
+ }
+ return targetPeerAddress;
+ }
+
+ @Override
+ public void clear() {
+ outgoingBundles.clear();
+ vertexMessageMap.clear();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
getBundleIterator() {
+ if (combiner != null) {
+ for (Map.Entry<InetSocketAddress, Map<WritableComparable, Writable>> e :
vertexMessageMap
+ .entrySet()) {
+ for (Map.Entry<WritableComparable, Writable> v : e.getValue()
+ .entrySet()) {
+ outgoingBundles.get(e.getKey()).addMessage(
+ new GraphJobMessage(v.getKey(), v.getValue()));
+ }
+ }
+ }
+
+ vertexMessageMap.clear();
+ return outgoingBundles.entrySet().iterator();
+ }
+}
Propchange:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
------------------------------------------------------------------------------
svn:eol-style = native