Author: edwardyoon
Date: Tue Feb 18 14:27:51 2014
New Revision: 1569340
URL: http://svn.apache.org/r1569340
Log:
HAMA-868: Fix SemiClustering bugs (edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Feb 18 14:27:51 2014
@@ -15,6 +15,7 @@ Release 0.7.0 (unreleased changes)
BUG FIXES
+ HAMA-868: Fix SemiClustering bugs (edwardyoon)
HAMA-871: NPE when building core module due to gmaven-plugin (Martin
Illecker)
HAMA-867: HAMA 0.7 doesn't work with HDFS 2X due to lack of libs (Skater Xu
via edwardyoon)
HAMA-862: Handling max tasks exception (edwardyoon)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Tue
Feb 18 14:27:51 2014
@@ -52,6 +52,7 @@ public class BSPMessageBundle<M extends
public BSPMessageBundle() {
bos = new ByteArrayOutputStream();
dos = new DataOutputStream(bos);
+
bundleSize = 0;
}
@@ -75,9 +76,10 @@ public class BSPMessageBundle<M extends
}
public Iterator<M> iterator() {
+ bis = new ByteArrayInputStream(bos.toByteArray());
+ dis = new DataInputStream(bis);
+
Iterator<M> it = new Iterator<M>() {
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- DataInputStream dis = new DataInputStream(bis);
M msg;
@Override
@@ -86,6 +88,11 @@ public class BSPMessageBundle<M extends
if (dis.available() > 0) {
return true;
} else {
+ dos = null;
+ bos = null;
+ bis = null;
+ dis = null;
+
return false;
}
} catch (IOException e) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Feb
18 14:27:51 2014
@@ -374,7 +374,7 @@ public final class BSPPeerImpl<K1, V1, K
final InetSocketAddress addr = entry.getKey();
final BSPMessageBundle<M> bundle = entry.getValue();
-
+
// remove this message during runtime to save a bit of memory
it.remove();
try {
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
Tue Feb 18 14:27:51 2014
@@ -185,6 +185,7 @@ public class SemiClusterMatchingTest ext
Map<String, List<String>> mpOutPutCluser = outputClusterLoader();
Iterator it = mpOutPutCluser.entrySet().iterator();
while (it.hasNext()) {
+ System.out.println(it.next());
flag = true;
Map.Entry pairs = (Map.Entry) it.next();
List<String> valFromMap = new ArrayList<String>();
@@ -220,8 +221,6 @@ public class SemiClusterMatchingTest ext
@Test
public void testSemiClustering() throws IOException, InterruptedException,
ClassNotFoundException {
- /* FIXME HAMA-868
-
generateTestData();
try {
@@ -260,7 +259,6 @@ public class SemiClusterMatchingTest ext
} finally {
deleteTempDirs();
}
- */
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Tue Feb 18
14:27:51 2014
@@ -17,12 +17,8 @@
*/
package org.apache.hama.graph;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -371,19 +367,6 @@ public abstract class Vertex<V extends W
return runner;
}
- public Vertex<V, E, M> deepCopy() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- this.write(dos);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- DataInputStream dis = new DataInputStream(bis);
-
- Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- vertex.readFields(dis);
- return vertex;
- }
-
@Override
public void aggregate(int index, M value) throws IOException {
this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value);
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterDetails.java
Tue Feb 18 14:27:51 2014
@@ -90,7 +90,7 @@ public class SemiClusterDetails implemen
} else if (!semiClusterId.equals(other.semiClusterId))
return false;
return Double.doubleToLongBits(semiClusterScore) == Double
- .doubleToLongBits(other.semiClusterScore);
+ .doubleToLongBits(other.semiClusterScore);
}
@Override
@@ -100,10 +100,8 @@ public class SemiClusterDetails implemen
@Override
public void readFields(DataInput in) throws IOException {
- String semiClusterId = in.readUTF();
- setSemiClusterId(semiClusterId);
- double score = in.readDouble();
- setSemiClusterScore(score);
+ this.semiClusterId = in.readUTF();
+ this.semiClusterScore = in.readDouble();
}
@Override
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusterMessage.java
Tue Feb 18 14:27:51 2014
@@ -18,16 +18,22 @@
package org.apache.hama.ml.semiclustering;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.graph.Vertex;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-
/**
* The SemiClusterMessage class defines the structure of the value stored by
* each vertex in the graph Job which is same as the Message sent my each
@@ -37,35 +43,15 @@ import java.util.*;
public class SemiClusterMessage implements
WritableComparable<SemiClusterMessage> {
- private String semiClusterId;
- private double semiClusterScore;
- private List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable,
SemiClusterMessage>>();
- private Set<SemiClusterDetails> semiClusterContainThis = new
TreeSet<SemiClusterDetails>();
-
- public SemiClusterMessage(String scId,
- List<Vertex<Text, DoubleWritable, SemiClusterMessage>> verticesEdges,
- double score) {
- this.semiClusterId = scId;
- this.semiClusterVertexList = verticesEdges;
- this.semiClusterScore = score;
- }
+ private String semiClusterId = null;
+ private double semiClusterScore = 0.0;
- public SemiClusterMessage(SemiClusterMessage msg) {
- this.semiClusterId = msg.getScId();
- for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : msg
- .getVertexList())
- this.semiClusterVertexList.add(v);
- this.semiClusterScore = msg.getScore();
- }
-
- public SemiClusterMessage(Set<SemiClusterDetails> semiClusterContainThis) {
- this.semiClusterId = "";
- this.semiClusterScore = 0.0;
- this.semiClusterVertexList = null;
- this.semiClusterContainThis = semiClusterContainThis;
- }
+ private List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList;
+ private Set<SemiClusterDetails> semiClusterContainThis;
public SemiClusterMessage() {
+ semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable,
SemiClusterMessage>>();
+ semiClusterContainThis = new TreeSet<SemiClusterDetails>();
}
public double getScore() {
@@ -84,20 +70,41 @@ public class SemiClusterMessage implemen
this.semiClusterVertexList.add(v);
}
- public String getScId() {
+ public void addVertexList(
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>> list) {
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : list) {
+ addVertex(v);
+ }
+ }
+
+ public void setSemiClusterContainThis(
+ Set<SemiClusterDetails> semiClusterContainThis) {
+ this.semiClusterContainThis = semiClusterContainThis;
+ }
+
+ public String getSemiClusterId() {
return semiClusterId;
}
- public void setScId(String scId) {
+ public void setSemiClusterId(String scId) {
this.semiClusterId = scId;
}
+ public boolean contains(Text vertexID) {
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v :
this.semiClusterVertexList) {
+ if (v.getVertexID().equals(vertexID)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void readFields(DataInput in) throws IOException {
- clear();
- String semiClusterId = in.readUTF();
- setScId(semiClusterId);
- double score = in.readDouble();
- setScore(score);
+ if (in.readBoolean()) {
+ this.semiClusterId = in.readUTF();
+ }
+ this.semiClusterScore = in.readDouble();
+
if (in.readBoolean()) {
int len = in.readInt();
if (len > 0) {
@@ -119,13 +126,13 @@ public class SemiClusterMessage implemen
}
- private void clear() {
- semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable,
SemiClusterMessage>>();
- semiClusterContainThis = new TreeSet<SemiClusterDetails>();
- }
-
public void write(DataOutput out) throws IOException {
- out.writeUTF(semiClusterId);
+ if (this.semiClusterId == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(semiClusterId);
+ }
out.writeDouble(semiClusterScore);
if (this.semiClusterVertexList == null) {
@@ -138,7 +145,8 @@ public class SemiClusterMessage implemen
}
}
out.writeInt(semiClusterContainThis.size());
- for (SemiClusterDetails semiClusterContainThi : semiClusterContainThis)
semiClusterContainThi.write(out);
+ for (SemiClusterDetails semiClusterContainThi : semiClusterContainThis)
+ semiClusterContainThi.write(out);
}
public Set<SemiClusterDetails> getSemiClusterContainThis() {
@@ -172,7 +180,7 @@ public class SemiClusterMessage implemen
}
public int compareTo(SemiClusterMessage m) {
- return (this.getScId().compareTo(m.getScId()));
+ return (this.getSemiClusterId().compareTo(m.getSemiClusterId()));
}
@Override
@@ -207,4 +215,8 @@ public class SemiClusterMessage implemen
+ semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList
+ ", semiClusterContainThis=" + semiClusterContainThis + "]";
}
+
+ public int size() {
+ return this.semiClusterVertexList.size();
+ }
}
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java?rev=1569340&r1=1569339&r2=1569340&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Tue Feb 18 14:27:51 2014
@@ -21,7 +21,6 @@ package org.apache.hama.ml.semiclusterin
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -29,6 +28,7 @@ import java.util.TreeSet;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
@@ -47,10 +47,8 @@ public class SemiClusteringVertex extend
@Override
public void setup(HamaConfiguration conf) {
- semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count",
- 10);
- graphJobMessageSentCount = conf.getInt(
- "semicluster.max.message.sent.count", 10);
+ semiClusterMaximumVertexCount =
conf.getInt("semicluster.max.vertex.count", 10);
+ graphJobMessageSentCount =
conf.getInt("semicluster.max.message.sent.count", 10);
graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count",
10);
}
@@ -61,98 +59,82 @@ public class SemiClusteringVertex extend
@Override
public void compute(Iterable<SemiClusterMessage> messages) throws
IOException {
if (this.getSuperstepCount() == 0) {
- firstSuperStep();
+ initClusters();
}
if (this.getSuperstepCount() >= 1) {
- Set<SemiClusterMessage> scListContainThis = new
TreeSet<SemiClusterMessage>();
- Set<SemiClusterMessage> scListNotContainThis = new
TreeSet<SemiClusterMessage>();
- List<SemiClusterMessage> scList = new ArrayList<SemiClusterMessage>();
+ TreeSet<SemiClusterMessage> candidates = new
TreeSet<SemiClusterMessage>();
+
for (SemiClusterMessage msg : messages) {
- if (!isVertexInSc(msg)) {
- scListNotContainThis.add(msg);
- SemiClusterMessage msgNew = new SemiClusterMessage(msg);
+ candidates.add(msg);
+
+ if (!msg.contains(this.getVertexID())
+ && msg.size() == semiClusterMaximumVertexCount) {
+ SemiClusterMessage msgNew = WritableUtils.clone(msg, this.getConf());
msgNew.addVertex(this);
- msgNew
- .setScId("C" + createNewSemiClusterName(msgNew.getVertexList()));
+ msgNew.setSemiClusterId("C"
+ + createNewSemiClusterName(msgNew.getVertexList()));
msgNew.setScore(semiClusterScoreCalcuation(msgNew));
- scListContainThis.add(msgNew);
- } else {
- scListContainThis.add(msg);
+
+ candidates.add(msgNew);
}
}
- scList.addAll(scListContainThis);
- scList.addAll(scListNotContainThis);
- sendBestSCMsg(scList);
- updatesVertexSemiClustersList(scListContainThis);
- }
- }
- public List<SemiClusterMessage> addSCList(List<SemiClusterMessage> scList,
- SemiClusterMessage msg) {
- return scList;
- }
+ Iterator<SemiClusterMessage> bestCandidates =
candidates.descendingIterator();
+ int count = 0;
- /**
- * This function create a new Semi-cluster ID for a semi-cluster from the
list
- * of vertices in the cluster.It first take all the vertexIds as a list sort
- * the list and then find the HashCode of the Sorted List.
- */
- public int createNewSemiClusterName(
- List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList) {
- List<String> vertexIDList =
getSemiClusterVerticesIdList(semiClusterVertexList);
- Collections.sort(vertexIDList);
- return (vertexIDList.hashCode());
+ while (bestCandidates.hasNext() && count < graphJobMessageSentCount) {
+ SemiClusterMessage candidate = bestCandidates.next();
+ sendMessageToNeighbors(candidate);
+ count++;
+ }
+
+ // Update candidates
+ SemiClusterMessage value = this.getValue();
+ Set<SemiClusterDetails> clusters = value.getSemiClusterContainThis();
+ for (SemiClusterMessage msg : candidates) {
+ if (clusters.size() > graphJobVertexMaxClusterCount) {
+ break;
+ } else {
+ clusters.add(new SemiClusterDetails(msg.getSemiClusterId(),
msg.getScore()));
+ }
+ }
+
+ this.setValue(value);
+ }
}
- /**
- * Function which is executed in the first SuperStep
- *
- * @throws java.io.IOException
- */
- public void firstSuperStep() throws IOException {
- Vertex<Text, DoubleWritable, SemiClusterMessage> v = this.deepCopy();
+ private void initClusters() throws IOException {
List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV = new
ArrayList<Vertex<Text, DoubleWritable, SemiClusterMessage>>();
- lV.add(v);
+ lV.add(WritableUtils.clone(this, this.getConf()));
String newClusterName = "C" + createNewSemiClusterName(lV);
- SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName,
- lV, 1);
+ SemiClusterMessage initialClusters = new SemiClusterMessage();
+ initialClusters.setSemiClusterId(newClusterName);
+ initialClusters.addVertexList(lV);
+ initialClusters.setScore(1);
+
sendMessageToNeighbors(initialClusters);
Set<SemiClusterDetails> scList = new TreeSet<SemiClusterDetails>();
scList.add(new SemiClusterDetails(newClusterName, 1.0));
- SemiClusterMessage vertexValue = new SemiClusterMessage(scList);
+ SemiClusterMessage vertexValue = new SemiClusterMessage();
+ vertexValue.setSemiClusterContainThis(scList);
this.setValue(vertexValue);
}
/**
- * Vertex V updates its list of semi-clusters with the semi- clusters from c1
- * , ..., ck , c'1 , ..., c'k that contain V
+ * This function create a new Semi-cluster ID for a semi-cluster from the
list
+ * of vertices in the cluster.It first take all the vertexIds as a list sort
+ * the list and then find the HashCode of the Sorted List.
*/
- public void updatesVertexSemiClustersList(
- Set<SemiClusterMessage> scListContainThis) throws IOException {
- List<SemiClusterDetails> scList = new ArrayList<SemiClusterDetails>();
- Set<SemiClusterMessage> sortedSet = new TreeSet<SemiClusterMessage>(
- new Comparator<SemiClusterMessage>() {
-
- @Override
- public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
- return (o1.getScore() == o2.getScore() ? 0
- : o1.getScore() < o2.getScore() ? -1 : 1);
- }
- });
- sortedSet.addAll(scListContainThis);
- int count = 0;
- for (SemiClusterMessage msg : sortedSet) {
- scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore()));
- if (count > graphJobMessageSentCount)
- break;
+ public int createNewSemiClusterName(
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList) {
+ List<String> vertexIDList = new ArrayList<String>();
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v :
semiClusterVertexList) {
+ vertexIDList.add(v.getVertexID().toString());
}
-
- SemiClusterMessage vertexValue = this.getValue();
- vertexValue
- .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount);
- this.setValue(vertexValue);
+ Collections.sort(vertexIDList);
+ return (vertexIDList.hashCode());
}
/**
@@ -164,15 +146,13 @@ public class SemiClusteringVertex extend
public double semiClusterScoreCalcuation(SemiClusterMessage message) {
double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0;
int vC = 0, eC = 0;
- List<String> vertexId = getSemiClusterVerticesIdList(message
- .getVertexList());
- vC = vertexId.size();
+ vC = message.size();
for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : message
.getVertexList()) {
List<Edge<Text, DoubleWritable>> eL = v.getEdges();
for (Edge<Text, DoubleWritable> e : eL) {
eC++;
- if (vertexId.contains(e.getDestinationVertexID().toString())
+ if (message.contains(e.getDestinationVertexID())
&& e.getValue() != null) {
iC = iC + e.getValue().get();
} else if (e.getValue() != null) {
@@ -185,55 +165,4 @@ public class SemiClusteringVertex extend
return sC;
}
- /**
- * Returns a Array List of vertexIds from a List of Vertex<Text,
- * DoubleWritable, SCMessage> Objects
- *
- * @param lV
- * @return
- */
- public List<String> getSemiClusterVerticesIdList(
- List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV) {
- Iterator<Vertex<Text, DoubleWritable, SemiClusterMessage>> vertexItrator =
lV
- .iterator();
- List<String> vertexId = new ArrayList<String>();
- while (vertexItrator.hasNext()) {
- vertexId.add(vertexItrator.next().getVertexID().toString());
- }
-
- return vertexId;
- }
-
- /**
- * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is
- * added to c to form c' .
- */
- public boolean isVertexInSc(SemiClusterMessage msg) {
- List<String> vertexId = getSemiClusterVerticesIdList(msg.getVertexList());
- return vertexId.contains(this.getVertexID().toString())
- && vertexId.size() < semiClusterMaximumVertexCount;
- }
-
- /**
- * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their
scores,
- * and the best ones are sent to V ?? neighbors.
- */
- public void sendBestSCMsg(List<SemiClusterMessage> scList) throws
IOException {
- Collections.sort(scList, new Comparator<SemiClusterMessage>() {
-
- @Override
- public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
- return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2
- .getScore() ? -1 : 1);
- }
- });
- Iterator<SemiClusterMessage> scItr = scList.iterator();
- int count = 0;
- while (scItr.hasNext()) {
- sendMessageToNeighbors(scItr.next());
- count++;
- if (count > graphJobMessageSentCount)
- break;
- }
- }
}