Author: edwardyoon
Date: Wed Aug 7 01:19:04 2013
New Revision: 1511158
URL: http://svn.apache.org/r1511158
Log:
HAMA-594: Semi-Clustering Algorithm Implementation
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1511158&r1=1511157&r2=1511158&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug 7 01:19:04 2013
@@ -4,6 +4,8 @@ Release 0.6.3 (unreleased changes)
NEW FEATURES
+ HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via
edwardyoon)
+
BUG FIXES
HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead
the user. (Yexi Jiang)
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1511158&r1=1511157&r2=1511158&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Wed Aug 7 01:19:04 2013
@@ -36,6 +36,7 @@ public class ExampleDriver {
pgd.addClass("pagerank", PageRank.class, "PageRank");
pgd.addClass("inlnkcount", InlinkCount.class, "InlinkCount");
pgd.addClass("bipartite", BipartiteMatching.class, "Bipartite Matching");
+ pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering");
pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering");
pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent");
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterDetails.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The SemiClusterDetails class is used to store a Semi-Cluster ID and its
+ * score.This class implements Comparable interface which compares the score of
+ * the objects.
+ *
+ */
+
+public class SemiClusterDetails implements
+ WritableComparable<SemiClusterDetails> {
+
+ private String semiClusterId;
+ private double semiClusterScore;
+
+ public SemiClusterDetails() {
+ this.semiClusterId = "";
+ this.semiClusterScore = 1.0;
+ }
+
+ public SemiClusterDetails(String semiClusterId, double semiClusterScore) {
+ this.semiClusterId = semiClusterId;
+ this.semiClusterScore = semiClusterScore;
+ }
+
+ public String getSemiClusterId() {
+ return semiClusterId;
+ }
+
+ public void setSemiClusterId(String semiClusterId) {
+ this.semiClusterId = semiClusterId;
+ }
+
+ public double getSemiClusterScore() {
+ return semiClusterScore;
+ }
+
+ public void setSemiClusterScore(double semiClusterScore) {
+ this.semiClusterScore = semiClusterScore;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((semiClusterId == null) ? 0 : semiClusterId.hashCode());
+ long temp;
+ temp = Double.doubleToLongBits(semiClusterScore);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SemiClusterDetails other = (SemiClusterDetails) obj;
+ if (semiClusterId == null) {
+ if (other.semiClusterId != null)
+ return false;
+ } else if (!semiClusterId.equals(other.semiClusterId))
+ return false;
+ if (Double.doubleToLongBits(semiClusterScore) != Double
+ .doubleToLongBits(other.semiClusterScore))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return semiClusterId;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String semiClusterId = in.readUTF();
+ setSemiClusterId(semiClusterId);
+ double score = in.readDouble();
+ setSemiClusterScore(score);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(semiClusterId);
+ out.writeDouble(semiClusterScore);
+ }
+
+ @Override
+ public int compareTo(SemiClusterDetails sc) {
+ return (this.getSemiClusterId().compareTo(sc.getSemiClusterId()));
+ }
+}
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.GraphJob;
+
+public class SemiClusterJobDriver {
+
+ protected static final Log LOG = LogFactory
+ .getLog(SemiClusterJobDriver.class);
+ private static final String outputPathString = "semicluster.outputpath";
+ private static final String inputPathString = "semicluster.inputmatrixpath";
+ private static final String requestedGraphJobMaxIterationString =
"hama.graph.max.iteration";
+ private static final String semiClusterMaximumVertexCount =
"semicluster.max.vertex.count";
+ private static final String graphJobMessageSentCount =
"semicluster.max.message.sent.count";
+ private static final String graphJobVertexMaxClusterCount =
"vertex.max.cluster.count";
+
+ public static void startTask(HamaConfiguration conf) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+ semiClusterJob
+ .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
+ semiClusterJob.setJobName("SemiClusterJob");
+ semiClusterJob.setVertexClass(SemiClusteringVertex.class);
+ semiClusterJob.setInputPath(new Path(conf.get(inputPathString)));
+ semiClusterJob.setOutputPath(new Path(conf.get(outputPathString)));
+
+ semiClusterJob.set("hama.graph.self.ref", "true");
+ semiClusterJob.set("hama.graph.repair", "true");
+
+ semiClusterJob.setVertexIDClass(Text.class);
+ semiClusterJob.setVertexValueClass(SemiClusterMessage.class);
+ semiClusterJob.setEdgeValueClass(DoubleWritable.class);
+
+ semiClusterJob.setInputKeyClass(LongWritable.class);
+ semiClusterJob.setInputValueClass(Text.class);
+ semiClusterJob.setInputFormat(TextInputFormat.class);
+ semiClusterJob.setVertexInputReaderClass(SemiClusterTextReader.class);
+
+ semiClusterJob.setPartitioner(HashPartitioner.class);
+
+ semiClusterJob.setOutputFormat(TextOutputFormat.class);
+ semiClusterJob.setOutputKeyClass(Text.class);
+ semiClusterJob.setOutputValueClass(Text.class);
+
+ long startTime = System.currentTimeMillis();
+ if (semiClusterJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+ private static void printUsage() {
+ LOG.info("Usage: SemiClusterO <input path> <output path> [number of tasks
(default max)] [Maximum number of vertices in a Semi Cluster (default 10)]
[Number of messages sent from a Vertex(default 10)][Maximum number of clusters
in which a vertex can be containted(default 10)]");
+ }
+
+ /**
+ * Function parses command line in standart form.
+ */
+ private static void parseArgs(HamaConfiguration conf, String[] args) {
+ if (args.length < 2) {
+ printUsage();
+ System.exit(-1);
+ }
+
+ conf.set(inputPathString, args[0]);
+
+ Path path = new Path(args[1]);
+ conf.set(outputPathString, path.toString());
+
+ if (args.length >= 3) {
+ try {
+ int taskCount = Integer.parseInt(args[2]);
+ if (taskCount < 0) {
+ printUsage();
+ throw new IllegalArgumentException(
+ "The number of requested job maximum iteration count can't be
negative. Actual value: "
+ + String.valueOf(taskCount));
+ }
+ conf.setInt(requestedGraphJobMaxIterationString, taskCount);
+ if (args.length >= 4) {
+ int maximumVertexCount = Integer.parseInt(args[3]);
+ if (maximumVertexCount < 0) {
+ printUsage();
+ throw new IllegalArgumentException(
+ "The number of maximum vertex count can't be negative.
Actual value: "
+ + String.valueOf(maximumVertexCount));
+ }
+ conf.setInt(semiClusterMaximumVertexCount, maximumVertexCount);
+ if (args.length >= 5) {
+ int messageSentCount = Integer.parseInt(args[4]);
+ if (messageSentCount < 0) {
+ printUsage();
+ throw new IllegalArgumentException(
+ "The number of maximum message sent count can't be
negative. Actual value: "
+ + String.valueOf(messageSentCount));
+ }
+ conf.setInt(graphJobMessageSentCount, messageSentCount);
+ if (args.length == 6) {
+ int vertexClusterCount = Integer.parseInt(args[5]);
+ if (vertexClusterCount < 0) {
+ printUsage();
+ throw new IllegalArgumentException(
+ "The maximum number of clusters in which a vertex can be
containted can't be negative. Actual value: "
+ + String.valueOf(vertexClusterCount));
+ }
+ conf.setInt(graphJobVertexMaxClusterCount, vertexClusterCount);
+
+ }
+ }
+ }
+ } catch (NumberFormatException e) {
+ printUsage();
+ throw new IllegalArgumentException(
+ "The format of job maximum iteration count is int. Can not parse
value: "
+ + args[2]);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ HamaConfiguration conf = new HamaConfiguration();
+ parseArgs(conf, args);
+ startTask(conf);
+ }
+}
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterMessage.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.graph.Vertex;
+
+/**
+ * The SemiClusterMessage class defines the structure of the value stored by
+ * each vertex in the graph Job which is same as the Message sent my each
+ * vertex.
+ *
+ */
+public class SemiClusterMessage implements
+ WritableComparable<SemiClusterMessage> {
+
+ private String semiClusterId;
+ private double semiClusterScore;
+ private List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable,
SemiClusterMessage>>();
+ private Set<SemiClusterDetails> semiClusterContainThis = new
TreeSet<SemiClusterDetails>();
+
+ public SemiClusterMessage(String scId,
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>> verticesEdges,
+ double score) {
+ this.semiClusterId = scId;
+ this.semiClusterVertexList = verticesEdges;
+ this.semiClusterScore = score;
+ }
+
+ public SemiClusterMessage(SemiClusterMessage msg) {
+ this.semiClusterId = msg.getScId();
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : msg
+ .getVertexList())
+ this.semiClusterVertexList.add(v);
+ this.semiClusterScore = msg.getScore();
+ }
+
+ public SemiClusterMessage(Set<SemiClusterDetails> semiClusterContainThis) {
+ this.semiClusterId = "";
+ this.semiClusterScore = 0.0;
+ this.semiClusterVertexList = null;
+ this.semiClusterContainThis = semiClusterContainThis;
+ }
+
+ public SemiClusterMessage() {
+ }
+
+ public double getScore() {
+ return semiClusterScore;
+ }
+
+ public void setScore(double score) {
+ this.semiClusterScore = score;
+ }
+
+ public List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
getVertexList() {
+ return semiClusterVertexList;
+ }
+
+ public void addVertex(Vertex<Text, DoubleWritable, SemiClusterMessage> v) {
+ this.semiClusterVertexList.add(v);
+ }
+
+ public String getScId() {
+ return semiClusterId;
+ }
+
+ public void setScId(String scId) {
+ this.semiClusterId = scId;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ clear();
+ String semiClusterId = in.readUTF();
+ setScId(semiClusterId);
+ double score = in.readDouble();
+ setScore(score);
+ if (in.readBoolean()) {
+ int len = in.readInt();
+ if (len > 0) {
+ for (int i = 0; i < len; i++) {
+ SemiClusteringVertex v = new SemiClusteringVertex();
+ v.readFields(in);
+ semiClusterVertexList.add(v);
+ }
+ }
+ }
+ int len = in.readInt();
+ if (len > 0) {
+ for (int i = 0; i < len; i++) {
+ SemiClusterDetails sd = new SemiClusterDetails();
+ sd.readFields(in);
+ semiClusterContainThis.add(sd);
+ }
+ }
+
+ }
+
+ private void clear() {
+ semiClusterVertexList = new ArrayList<Vertex<Text, DoubleWritable,
SemiClusterMessage>>();
+ semiClusterContainThis = new TreeSet<SemiClusterDetails>();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(semiClusterId);
+ out.writeDouble(semiClusterScore);
+
+ if (this.semiClusterVertexList == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(semiClusterVertexList.size());
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v :
semiClusterVertexList) {
+ v.write(out);
+ }
+ }
+ out.writeInt(semiClusterContainThis.size());
+ Iterator<SemiClusterDetails> itr = semiClusterContainThis.iterator();
+ while (itr.hasNext())
+ itr.next().write(out);
+ }
+
+ public Set<SemiClusterDetails> getSemiClusterContainThis() {
+ return semiClusterContainThis;
+ }
+
+ public void setSemiClusterContainThis(
+ List<SemiClusterDetails> semiClusterContainThis,
+ int graphJobVertexMaxClusterCount) {
+ int clusterCountToBeRemoved = 0;
+ NavigableSet<SemiClusterDetails> setSort = new TreeSet<SemiClusterDetails>(
+ new Comparator<SemiClusterDetails>() {
+
+ @Override
+ public int compare(SemiClusterDetails o1, SemiClusterDetails o2) {
+ return (o1.getSemiClusterScore() == o2.getSemiClusterScore() ? 0
+ : o1.getSemiClusterScore() < o2.getSemiClusterScore() ? -1 :
1);
+ }
+ });
+ setSort.addAll(this.semiClusterContainThis);
+ setSort.addAll(semiClusterContainThis);
+ clusterCountToBeRemoved = setSort.size() - graphJobVertexMaxClusterCount;
+ Iterator<SemiClusterDetails> itr = setSort.descendingIterator();
+ while (clusterCountToBeRemoved > 0) {
+ itr.next();
+ itr.remove();
+ clusterCountToBeRemoved--;
+ }
+ this.semiClusterContainThis = setSort;
+
+ }
+
+ public int compareTo(SemiClusterMessage m) {
+ return (this.getScId().compareTo(m.getScId()));
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((semiClusterId == null) ? 0 : semiClusterId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SemiClusterMessage other = (SemiClusterMessage) obj;
+ if (semiClusterId == null) {
+ if (other.semiClusterId != null)
+ return false;
+ } else if (!semiClusterId.equals(other.semiClusterId))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "SCMessage [semiClusterId=" + semiClusterId + ", semiClusterScore="
+ + semiClusterScore + ", semiClusterVertexList=" + semiClusterVertexList
+ + ", semiClusterContainThis=" + semiClusterContainThis + "]";
+ }
+}
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterTextReader.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+/**
+ * SemiClusterTextReader defines the way in which data is to be read from the
+ * input file and store as a vertex with VertexId and Edges
+ *
+ */
+public class SemiClusterTextReader extends
+ VertexInputReader<LongWritable, Text, Text, DoubleWritable, Text> {
+
+ String lastVertexId = null;
+ List<String> adjacents = new ArrayList<String>();
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, DoubleWritable, Text> vertex) {
+
+ String line = value.toString();
+ String[] lineSplit = line.split("\t");
+ if (!line.startsWith("#")) {
+ if (lastVertexId == null) {
+ lastVertexId = lineSplit[0];
+ }
+ if (lastVertexId.equals(lineSplit[0])) {
+ adjacents.add(lineSplit[1]);
+ } else {
+ vertex.setVertexID(new Text(lastVertexId));
+ for (String adjacent : adjacents) {
+ String[] ValueSplit = adjacent.split("-");
+ vertex.addEdge(new Edge<Text, DoubleWritable>(
+ new Text(ValueSplit[0]), new DoubleWritable(Double
+ .parseDouble(ValueSplit[1]))));
+ }
+ adjacents.clear();
+ lastVertexId = lineSplit[0];
+ adjacents.add(lineSplit[1]);
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterVertexOutputWriter.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.graph.GraphJobMessage;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexOutputWriter;
+
+/**
+ * The VertexOutputWriter defines what parts of the vertex shall be written to
+ * the output format.
+ *
+ * @param <V> the vertexID type.
+ * @param <E> the edge value type.
+ * @param <M> the vertex value type.
+ */
+@SuppressWarnings("rawtypes")
+public class SemiClusterVertexOutputWriter<KEYOUT extends Writable, VALUEOUT
extends Writable, V extends WritableComparable, E extends Writable, M extends
Writable>
+ implements VertexOutputWriter<KEYOUT, VALUEOUT, V, E, M> {
+
+ @Override
+ public void setup(Configuration conf) {
+ // do nothing
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Vertex<V, E, M> vertex,
+ BSPPeer<Writable, Writable, KEYOUT, VALUEOUT, GraphJobMessage> peer)
+ throws IOException {
+ SemiClusterMessage vertexValue = (SemiClusterMessage) vertex.getValue();
+ peer.write((KEYOUT) vertex.getVertexID(), (VALUEOUT) new Text(vertexValue
+ .getSemiClusterContainThis().toString()));
+ }
+
+}
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java?rev=1511158&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusteringVertex.java
Wed Aug 7 01:19:04 2013
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+
+/**
+ * SemiClusteringVertex Class defines each vertex in a Graph job and the
+ * compute() method is the function which is applied on each Vertex in the
graph
+ * on each Super step of the job execution.
+ *
+ */
+public class SemiClusteringVertex extends
+ Vertex<Text, DoubleWritable, SemiClusterMessage> {
+ private int semiClusterMaximumVertexCount;
+ private int graphJobMessageSentCount;
+ private int graphJobVertexMaxClusterCount;
+
+ @Override
+ public void setup(Configuration conf) {
+ semiClusterMaximumVertexCount = conf.getInt("semicluster.max.vertex.count",
+ 10);
+ graphJobMessageSentCount = conf.getInt(
+ "semicluster.max.message.sent.count", 10);
+ graphJobVertexMaxClusterCount = conf.getInt("vertex.max.cluster.count",
10);
+ }
+
+ /**
+ * The user overrides the Compute() method, which will be executed at each
+ * active vertex in every superstep
+ */
+ @Override
+ public void compute(Iterable<SemiClusterMessage> messages) throws
IOException {
+ if (this.getSuperstepCount() == 0) {
+ firstSuperStep();
+ }
+ if (this.getSuperstepCount() >= 1) {
+ Set<SemiClusterMessage> scListContainThis = new
TreeSet<SemiClusterMessage>();
+ Set<SemiClusterMessage> scListNotContainThis = new
TreeSet<SemiClusterMessage>();
+ List<SemiClusterMessage> scList = new ArrayList<SemiClusterMessage>();
+ for (SemiClusterMessage msg : messages) {
+ if (!isVertexInSc(msg)) {
+ scListNotContainThis.add(msg);
+ SemiClusterMessage msgNew = new SemiClusterMessage(msg);
+ msgNew.addVertex(this);
+ msgNew
+ .setScId("C" + createNewSemiClusterName(msgNew.getVertexList()));
+ msgNew.setScore(semiClusterScoreCalcuation(msgNew));
+ scListContainThis.add(msgNew);
+ } else {
+ scListContainThis.add(msg);
+ }
+ }
+ scList.addAll(scListContainThis);
+ scList.addAll(scListNotContainThis);
+ sendBestSCMsg(scList);
+ updatesVertexSemiClustersList(scListContainThis);
+ }
+ }
+
+ public List<SemiClusterMessage> addSCList(List<SemiClusterMessage> scList,
+ SemiClusterMessage msg) {
+ return scList;
+ }
+
+ /**
+ * This function create a new Semi-cluster ID for a semi-cluster from the
list
+ * of vertices in the cluster.It first take all the vertexIds as a list sort
+ * the list and then find the HashCode of the Sorted List.
+ */
+ public int createNewSemiClusterName(
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>>
semiClusterVertexList) {
+ List<String> vertexIDList =
getSemiClusterVerticesIdList(semiClusterVertexList);
+ Collections.sort(vertexIDList);
+ return (vertexIDList.hashCode());
+ }
+
+ /**
+ * Function which is executed in the first SuperStep
+ *
+ * @throws IOException
+ */
+ public void firstSuperStep() throws IOException {
+ Vertex<Text, DoubleWritable, SemiClusterMessage> v = this;
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV = new
ArrayList<Vertex<Text, DoubleWritable, SemiClusterMessage>>();
+ lV.add(v);
+ String newClusterName = "C" + createNewSemiClusterName(lV);
+ SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName,
+ lV, 1);
+ sendMessageToNeighbors(initialClusters);
+ Set<SemiClusterDetails> scList = new TreeSet<SemiClusterDetails>();
+ scList.add(new SemiClusterDetails(newClusterName, 1.0));
+ SemiClusterMessage vertexValue = new SemiClusterMessage(scList);
+ this.setValue(vertexValue);
+ }
+
+ /**
+ * Vertex V updates its list of semi-clusters with the semi- clusters from c1
+ * , ..., ck , c'1 , ..., c'k that contain V
+ */
+ public void updatesVertexSemiClustersList(
+ Set<SemiClusterMessage> scListContainThis) throws IOException {
+ List<SemiClusterDetails> scList = new ArrayList<SemiClusterDetails>();
+ Set<SemiClusterMessage> sortedSet = new TreeSet<SemiClusterMessage>(
+ new Comparator<SemiClusterMessage>() {
+
+ @Override
+ public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
+ return (o1.getScore() == o2.getScore() ? 0
+ : o1.getScore() < o2.getScore() ? -1 : 1);
+ }
+ });
+ sortedSet.addAll(scListContainThis);
+ int count = 0;
+ for (SemiClusterMessage msg : sortedSet) {
+ scList.add(new SemiClusterDetails(msg.getScId(), msg.getScore()));
+ if (count > graphJobMessageSentCount)
+ break;
+ }
+
+ SemiClusterMessage vertexValue = this.getValue();
+ vertexValue
+ .setSemiClusterContainThis(scList, graphJobVertexMaxClusterCount);
+ this.setValue(vertexValue);
+ }
+
+ /**
+ * Function to calcualte the Score of a semi-cluster
+ *
+ * @param message
+ * @return
+ */
+ public double semiClusterScoreCalcuation(SemiClusterMessage message) {
+ double iC = 0.0, bC = 0.0, fB = 0.0, sC = 0.0;
+ int vC = 0, eC = 0;
+ List<String> vertexId = getSemiClusterVerticesIdList(message
+ .getVertexList());
+ vC = vertexId.size();
+ for (Vertex<Text, DoubleWritable, SemiClusterMessage> v : message
+ .getVertexList()) {
+ List<Edge<Text, DoubleWritable>> eL = v.getEdges();
+ for (Edge<Text, DoubleWritable> e : eL) {
+ eC++;
+ if (vertexId.contains(e.getDestinationVertexID().toString())
+ && e.getCost() != null) {
+ iC = iC + e.getCost().get();
+ } else if (e.getCost() != null) {
+ bC = bC + e.getCost().get();
+ }
+ }
+ }
+ if (vC > 1)
+ sC = ((iC - fB * bC) / ((vC * (vC - 1)) / 2)) / eC;
+ return sC;
+ }
+
+ /**
+ * Returns a Array List of vertexIds from a List of Vertex<Text,
+ * DoubleWritable, SCMessage> Objects
+ *
+ * @param lV
+ * @return
+ */
+ public List<String> getSemiClusterVerticesIdList(
+ List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV) {
+ Iterator<Vertex<Text, DoubleWritable, SemiClusterMessage>> vertexItrator =
lV
+ .iterator();
+ List<String> vertexId = new ArrayList<String>();
+ while (vertexItrator.hasNext()) {
+ vertexId.add(vertexItrator.next().getVertexID().toString());
+ }
+ return vertexId;
+ }
+
+ /**
+ * If a semi-cluster c does not already contain V , and Vc < Mmax , then V is
+ * added to c to form c' .
+ */
+ public boolean isVertexInSc(SemiClusterMessage msg) {
+ List<String> vertexId = getSemiClusterVerticesIdList(msg.getVertexList());
+ if (vertexId.contains(this.getVertexID().toString())
+ && vertexId.size() < semiClusterMaximumVertexCount)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * The semi-clusters c1 , ..., ck , c'1 , ..., c'k are sorted by their
scores,
+ * and the best ones are sent to V ?? neighbors.
+ */
+ public void sendBestSCMsg(List<SemiClusterMessage> scList) throws
IOException {
+ Collections.sort(scList, new Comparator<SemiClusterMessage>() {
+
+ @Override
+ public int compare(SemiClusterMessage o1, SemiClusterMessage o2) {
+ return (o1.getScore() == o2.getScore() ? 0 : o1.getScore() < o2
+ .getScore() ? -1 : 1);
+ }
+ });
+ Iterator<SemiClusterMessage> scItr = scList.iterator();
+ int count = 0;
+ while (scItr.hasNext()) {
+ sendMessageToNeighbors(scItr.next());
+ count++;
+ if (count > graphJobMessageSentCount)
+ break;
+ }
+ }
+}