Author: edwardyoon
Date: Thu Mar 26 08:27:24 2015
New Revision: 1669291
URL: http://svn.apache.org/r1669291
Log:
HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon)
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
hama/trunk/examples/src/test/resources/kcore.txt
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1669291&r1=1669290&r2=1669291&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Mar 26 08:27:24 2015
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
NEW FEATURES
+ HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon)
HAMA-907: Add MaxFlow example (Zhengjun via edwardyoon)
HAMA-915: Add Kryo serializer (edwardyoon)
HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1669291&r1=1669290&r2=1669291&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Thu Mar 26 08:27:24 2015
@@ -39,8 +39,9 @@ public class ExampleDriver {
pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering");
pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering");
pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent");
- pgd.addClass("neuralnets", NeuralNetwork.class, "Neural Network
classification");
-
+ pgd.addClass("neuralnets", NeuralNetwork.class,
+ "Neural Network classification");
+ pgd.addClass("kcore", KCore.class, "kcore");
pgd.addClass("gen", Generator.class, "Random Data Generator Util");
pgd.driver(args);
} catch (Throwable e) {
Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
(added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java Thu
Mar 26 08:27:24 2015
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.ml.kcore.KCoreMessage;
+import org.apache.hama.ml.kcore.KCoreVertex;
+import org.apache.hama.ml.kcore.KCoreVertexReader;
+import org.apache.hama.ml.kcore.KCoreVertexWriter;
+
+public class KCore {
+ private static GraphJob createJob(String[] args, HamaConfiguration conf)
+ throws IOException {
+ GraphJob graphJob = new GraphJob(conf, KCore.class);
+ graphJob.setJobName("KCore");
+
+ graphJob.setInputPath(new Path(args[0]));
+ graphJob.setOutputPath(new Path(args[1]));
+
+ graphJob.setVertexClass(KCoreVertex.class);
+ graphJob.setVertexIDClass(LongWritable.class);
+ graphJob.setEdgeValueClass(LongWritable.class);
+ graphJob.setVertexValueClass(KCoreMessage.class);
+
+ graphJob.setVertexInputReaderClass(KCoreVertexReader.class);
+ graphJob.setVertexOutputWriterClass(KCoreVertexWriter.class);
+
+ graphJob.setOutputKeyClass(LongWritable.class);
+ graphJob.setOutputValueClass(IntWritable.class);
+
+ return graphJob;
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: <input> <output>");
+ System.exit(-1);
+ }
+
+ public static void main(String[] args) throws IOException,
+ ClassNotFoundException, InterruptedException {
+ if (args.length != 2) {
+ printUsage();
+ }
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ GraphJob graphJob = createJob(args, conf);
+ long startTime = System.currentTimeMillis();
+ if (graphJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+}
Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
(added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
Thu Mar 26 08:27:24 2015
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+public class KCoreTest extends TestCase {
+ private static String INPUT = "src/test/resources/kcore.txt";
+ private static String OUTPUT = "/tmp/kcore-out";
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ Map<String, String> outResults = new HashMap<String, String>();
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testKcore() throws IllegalArgumentException, IOException,
+ ClassNotFoundException, InterruptedException {
+ try {
+ setOutputResult();
+ KCore.main(new String[] { INPUT, OUTPUT });
+ verifyResult();
+ } finally {
+ fs.exists(new Path(OUTPUT));
+ }
+ }
+
+ public void setOutputResult() {
+ String[] vertex = { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10",
"11" };
+ String[] core = { "1", "3", "1", "1", "3", "2", "1", "3", "3", "2", "2" };
+
+ for (int i = 0; i < vertex.length; i++) {
+ outResults.put(vertex[i], core[i]);
+ }
+ }
+
+ private void verifyResult() throws IllegalArgumentException, IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ fs.open(new Path(OUTPUT + "/part-00000"))));
+
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ String[] temp = line.split("\t");
+ assertTrue(outResults.containsKey(temp[0]));
+ assertEquals(temp[1], outResults.get(temp[0]));
+ }
+ }
+}
Added: hama/trunk/examples/src/test/resources/kcore.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/resources/kcore.txt?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/test/resources/kcore.txt (added)
+++ hama/trunk/examples/src/test/resources/kcore.txt Thu Mar 26 08:27:24 2015
@@ -0,0 +1,11 @@
+1 2
+2 1 3 9 8 5
+3 2
+4 5
+5 2 4 6 8 9
+6 5 7 8
+7 6
+8 2 5 6 9 10 11
+9 2 5 8
+10 8 11
+11 8 10
\ No newline at end of file
Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java
(added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java Thu
Mar 26 08:27:24 2015
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class KCoreMessage implements Writable {
+ private long vertexID;
+ private int core;
+
+ public KCoreMessage() {
+ this.vertexID = 0L;
+ this.core = 0;
+ }
+
+ public KCoreMessage(long vertexID, int core) {
+ this.vertexID = vertexID;
+ this.core = core;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ vertexID = input.readLong();
+ core = input.readInt();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(vertexID);
+ output.writeInt(core);
+ }
+
+ public long getVertexID(){
+ return vertexID;
+ }
+
+ public int getCore(){
+ return core;
+ }
+}
Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java
(added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java Thu
Mar 26 08:27:24 2015
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+public class KCoreVertex extends
+ Vertex<LongWritable, LongWritable, KCoreMessage> {
+
+ private static final Logger logger = Logger.getLogger(KCoreVertex.class);
+ private int core;
+ private boolean changed;
+ private HashMap<Long, Integer> estimates;
+
+ public KCoreVertex() {
+ super();
+ this.changed = false;
+ this.core = 0;
+ this.estimates = new HashMap<Long, Integer>();
+ }
+
+ public KCoreVertex(int core) {
+ super();
+ this.changed = false;
+ this.core = core;
+ this.estimates = new HashMap<Long, Integer>();
+ }
+
+ public KCoreVertex(int core, HashMap<Long, Integer> estimates) {
+ super();
+ this.changed = false;
+ this.core = core;
+ this.estimates = estimates;
+ }
+
+ public boolean isChanged() {
+ return changed;
+ }
+
+ public void setChanged(boolean changed) {
+ this.changed = changed;
+ }
+
+ public int getCore() {
+ return core;
+ }
+
+ public void setCore(int core) {
+ this.core = core;
+ }
+
+ public double getNeighborEstimate(long neighbor) {
+ if (estimates.containsKey(neighbor)) {
+ return estimates.get(neighbor);
+ }
+ return (-1);
+ }
+
+ public void setNeighborNewEstimate(long neighbor, int estimate) {
+ if (this.estimates.containsKey(neighbor)) {
+ this.estimates.put(neighbor, estimate);
+ }
+ }
+
+ public void logNeighborEstimates(long vertexId) {
+ logger.info(vertexId + " neighbor estimates: ");
+ for (Map.Entry<Long, Integer> entry : estimates.entrySet()) {
+ logger.info("\t" + entry.getKey() + "-" + entry.getValue());
+ }
+ }
+
+ public void setNeighborEstimate(long neighbor, int estimate) {
+ estimates.put(neighbor, estimate);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ core = in.readInt();
+ changed = in.readBoolean();
+
+ this.estimates = new HashMap<Long, Integer>();
+ if (in.readBoolean()) {
+ int num = in.readInt();
+ for (int i = 0; i < num; ++i) {
+ long key = in.readLong();
+ int value = in.readInt();
+ estimates.put(key, value);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeDouble(core);
+ out.writeBoolean(changed);
+
+ if (this.estimates == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(this.estimates.size());
+
+ for (Map.Entry<Long, Integer> entry : estimates.entrySet()) {
+ out.writeLong(entry.getKey());
+ out.writeInt(entry.getValue());
+ }
+ }
+ }
+
+ public int computeEstimate() {
+ int old = this.core;
+ double[] count = new double[this.core + 1];
+
+ for (Map.Entry<Long, Integer> entry : this.estimates.entrySet()) {
+ logger.info("Processing " + entry.getKey() + ": " + entry.getValue());
+ double j = Math.min(this.core, entry.getValue().doubleValue());
+ logger.info("Min: " + j);
+ count[(int) j] = count[(int) j] + 1;
+ }
+
+ logger.info("Count before");
+ int i;
+ for (i = 0; i < count.length; i++) {
+ logger.info(i + " " + count[i]);
+ }
+
+ for (i = this.core; i > 1; i--)
+ count[i - 1] = count[i - 1] + count[i];
+
+ logger.info("Count after");
+ for (i = 0; i < count.length; i++) {
+ logger.info(i + " " + count[i]);
+ }
+
+ i = this.core;
+ while ((i > 1) && (count[i] < i)) {
+ logger.info("Decrementing" + i + " down one because " + count[i]
+ + " is less than that");
+ i = i - 1;
+ }
+ logger.info("Loop terminated: i: " + i + " and count[i] = " + count[i]);
+
+ if (i != old) {
+ logger.info("New Core Estimate: " + i + "\n");
+ }
+ return i;
+ }
+
+ @Override
+ public void compute(Iterable<KCoreMessage> msgs) throws IOException {
+ if (this.getSuperstepCount() == 0) {
+ this.core = getEdges().size();
+
+ for (Edge<LongWritable, LongWritable> edge : getEdges()) {
+ sendMessage(edge, new KCoreMessage(getVertexID().get(),
+ getCore()));
+ }
+
+ } else {
+ logger.info("getSuperstepCount = " + getSuperstepCount()
+ + " vertex = " + getVertexID() + " Core = " + getCore());
+
+ List<KCoreMessage> messages = Lists.newArrayList(msgs);
+ if (this.getSuperstepCount() == 1) {
+ for (KCoreMessage message : messages) {
+ estimates.put(message.getVertexID(),
+ (Integer.MAX_VALUE));
+ }
+ }
+
+ logger.info(getVertexID() + " got estimates of: ");
+ for (KCoreMessage message : messages) {
+
+ logger.info("Processing message from "
+ + message.getVertexID());
+
+ double temp = getNeighborEstimate(message.getVertexID());
+
+ if (message.getCore() < temp) {
+ setNeighborNewEstimate(message.getVertexID(),
+ message.getCore());
+
+ int t = computeEstimate();
+
+ if (t < getCore()) {
+ logger.info("Setting new core value! \n\n");
+ setCore(t);
+ setChanged(true);
+ }
+ }
+ }
+ logger.info("Done recomputing estimate for node " + getVertexID());
+
+ if (!isChanged()) {
+ this.voteToHalt();
+ } else {
+ for (Edge<LongWritable, LongWritable> edge : getEdges()) {
+ sendMessage(edge, new KCoreMessage(getVertexID().get(),
+ getCore()));
+ }
+ setChanged(false);
+ }
+ }
+}
+
+
+}
Added:
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
(added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
Thu Mar 26 08:27:24 2015
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+/**
+* VertexInputFormat for the KCore algorithm
+* specified in tab-delimited text file format.
+*/
+public class KCoreVertexReader
+ extends
+ VertexInputReader<LongWritable, Text, LongWritable, LongWritable,
KCoreMessage> {
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<LongWritable, LongWritable, KCoreMessage> vertex) throws
Exception {
+ String[] vertices = value.toString().split("\t");
+ List<Edge<LongWritable, LongWritable>> edges = new
ArrayList<Edge<LongWritable, LongWritable>>();
+
+ for(int i=1; i<vertices.length; i++){
+ LongWritable destID = new LongWritable(Long.parseLong(vertices[i]));
+ edges.add(new Edge<LongWritable, LongWritable>(destID, new
LongWritable(0)));
+ }
+
+ vertex.setEdges(edges);
+ vertex.setValue(new KCoreMessage());
+ vertex.setVertexID(new LongWritable(Long.parseLong(vertices[0])));
+ return true;
+ }
+}
Added:
hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
(added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
Thu Mar 26 08:27:24 2015
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.graph.GraphJobMessage;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexOutputWriter;
+
+public class KCoreVertexWriter
+ implements
+ VertexOutputWriter<LongWritable, IntWritable, LongWritable, LongWritable,
KCoreMessage> {
+
+ @Override
+ public void setup(Configuration conf) {
+
+ }
+
+ @Override
+ public void write(
+ Vertex<LongWritable, LongWritable, KCoreMessage> vertex,
+ BSPPeer<Writable, Writable, LongWritable, IntWritable, GraphJobMessage>
peer)
+ throws IOException {
+ peer.write(vertex.getVertexID(),
+ new IntWritable(((KCoreVertex) vertex).getCore()));
+ }
+}