Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change 
notification.

The "DynamicGraphs" page has been changed by AnastasisAndronidis:
https://wiki.apache.org/hama/DynamicGraphs?action=diff&rev1=3&rev2=4

- == Dynamic Addition and Removal of Vertexes in Graph API ==
+ == Adding and Removing Vertices ==
  
- === Introduction ===
+ === Description ===
  Nowadays more and more people turn to distributed environments to store and 
analyze Big Data. For that reason there is a growing need in both achieving 
efficiency and new features. Hama is an upcoming project that gives to 
researchers and analysts a way to handle big amounts of data, through the BSP 
computation model.
   
  A very interesting feature of the Hama project is the Graph API for graph 
analysis. A lot of scientists and companies represent and manage their data 
with the use of one or more of the many different kinds of graphs (e.g. 
incremental graphs).
   
  This article will present some new features on the Graph API of Hama, on how 
to create a dynamic graphs on run time. It also will serve as a technical 
document of the internals of those features.
  
- === Description ===
- At the time being, we will discuss only two different operations (addition 
and deletion) from both points of the end-user and internal architecture.
+ === Introduction ===
+ At the time being, we will discuss only two different operations, addition 
and deletion. We will give an example on how to use this feature and later on 
we will describe the internal implementation.
  
  Some points that we need to keep in mind, are:
   1. Deletion/Addition is happening after a super step. ''We are providing 
methods inside a vertex instance that create/delete a vertex''
+  2. The vertex API is build on top of BSP peers. That means that each node of 
your cluster contains a specific number of BSP peers and therefor each BSP peer 
contains multiple vertices.
-  2. New vertexes need to be distributed through partitioner to be placed on 
right peers.
+  3. New vertexes need to be distributed through partitioner to be placed on 
right peers.
-  3. Keep in mind that a new created vertex and old vertexes, will have the 
same super step counter. ''Various algorithms change their behavior though 
time, an example with an internal counter to serve as a state is needed''
+  4. New and old vertexes, will have the same super step counter. ''Various 
algorithms change their behavior though time. If such a case, you need to 
develop your own state counter''
+ 
+ === User Example ===
+ ''The following code is part of the Graph examples. You can find it in 
org.apache.hama.examples.DynamicGraph''
+ 
+ This is an example of how to manipulate Graphs dynamically. The input of this 
example is a number in each row. We assume that the is a vertex with ID:1 which 
is responsible to create a sum vertex that will aggregate the values of the 
other vertices. During the aggregation, sum vertex will delete all other 
vertices. 
+ 
+ Input example:
+  * 1
+  * 2
+  * 3
+  * 4
+ 
+ Output example:
+  * sum  12 
+ 
+ (we also add the number of vertices that exist in the last superstep from two 
different methods)
+ 
+ 
+ {{{
+ public class DynamicGraph {
+   
+   public static class GraphTextReader extends 
+       VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+             Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+ 
+         vertex.setVertexID(value);
+         vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+ 
+         return true;
+     }
+   }
+ 
+   public static class GraphVertex extends 
+       Vertex<Text, NullWritable, IntWritable> {
+     
+     private void createSumVertex() throws IOException {
+       if (this.getVertexID().toString().equals("1")) {
+         Text new_id = new Text("sum");
+         this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new 
IntWritable(0));
+       }
+     }
+ 
+     private void sendAllValuesToSumAndRemove() throws IOException {
+       if (!this.getVertexID().toString().equals("sum")) {
+         this.sendMessage(new Text("sum"), this.getValue());
+         this.remove();
+       }
+     }
+ 
+     // this must run only on "sum" vertex
+     private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getVertexID().toString().equals("sum")) {
+         int s = 0;
+         for (IntWritable i : msgs) {
+           s += i.get();
+         }
+         s += 
this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+         s += this.getNumVertices();
+         this.setValue(new IntWritable(this.getValue().get() +s));
+       } else {
+         throw new UnsupportedOperationException("We have more vertecies than 
we expected: " + this.getVertexID() + " " + this.getValue()); 
+       }
+     }
+ 
+     @Override
+     public void compute(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getSuperstepCount() == 0) {
+         createSumVertex();
+       } else if (this.getSuperstepCount() == 1) {
+         sendAllValuesToSumAndRemove();
+       } else if (this.getSuperstepCount() == 2) {
+         calculateSum(msgs);
+       } else if (this.getSuperstepCount() == 3) {
+         this.voteToHalt();
+       }
+     }
+   }
+ 
+   public static void main(String[] args) throws IOException, 
+         InterruptedException, ClassNotFoundException {
+     if (args.length != 2) {
+       printUsage();
+     }
+     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     GraphJob graphJob = createJob(args, conf);
+     long startTime = System.currentTimeMillis();
+     if (graphJob.waitForCompletion(true)) {
+       System.out.println("Job Finished in "
+           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+     }
+   }
+ 
+   private static void printUsage() {
+     System.out.println("Usage: <input> <output>");
+     System.exit(-1);
+   }
+ 
+   private static GraphJob createJob(String[] args, HamaConfiguration conf) 
throws IOException {
+     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+     graphJob.setJobName("Dynamic Graph");
+     graphJob.setVertexClass(GraphVertex.class);
+ 
+     graphJob.setInputPath(new Path(args[0]));
+     graphJob.setOutputPath(new Path(args[1]));
+ 
+     graphJob.setVertexIDClass(Text.class);
+     graphJob.setVertexValueClass(IntWritable.class);
+     graphJob.setEdgeValueClass(NullWritable.class);
+ 
+     graphJob.setInputFormat(TextInputFormat.class);
+ 
+     graphJob.setVertexInputReaderClass(GraphTextReader.class);
+     graphJob.setPartitioner(HashPartitioner.class);
+ 
+     graphJob.setOutputFormat(TextOutputFormat.class);
+     graphJob.setOutputKeyClass(Text.class);
+     graphJob.setOutputValueClass(IntWritable.class);
+ 
+     return graphJob;
+   }  
+ 
+ }
+ }}}
+ 
+ Starting from creating a class that will serve as a container of our example. 
In this case, the class is '''DynamicGraph'''.
+ 
+ '''DynamicGraph''' contains three important parts.
+ 
+ The first component is the declaration of the input reader. This class 
overrides the '''parseVertex''' method and creates vertex instances from an 
input file. In our case the input is a text file in which, each line has a 
number. This number is assigned as both the ID and value of the vertex.
+ 
+ {{{
+   public static class GraphTextReader extends 
+       VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+             Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+ 
+         vertex.setVertexID(value);
+         vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+ 
+         return true;
+     }
+   }
+ }}}
+ 
+ The second component is the standard boilerplate to create and submit a 
'''GraphJob'''.
+ 
+ {{{
+   public static void main(String[] args) throws IOException, 
+         InterruptedException, ClassNotFoundException {
+     if (args.length != 2) {
+       printUsage();
+     }
+     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     GraphJob graphJob = createJob(args, conf);
+     long startTime = System.currentTimeMillis();
+     if (graphJob.waitForCompletion(true)) {
+       System.out.println("Job Finished in "
+           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+     }
+   }
+ 
+   private static void printUsage() {
+     System.out.println("Usage: <input> <output>");
+     System.exit(-1);
+   }
+ 
+   private static GraphJob createJob(String[] args, HamaConfiguration conf) 
throws IOException {
+     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+     graphJob.setJobName("Dynamic Graph");
+     graphJob.setVertexClass(GraphVertex.class);
+ 
+     graphJob.setInputPath(new Path(args[0]));
+     graphJob.setOutputPath(new Path(args[1]));
+ 
+     graphJob.setVertexIDClass(Text.class);
+     graphJob.setVertexValueClass(IntWritable.class);
+     graphJob.setEdgeValueClass(NullWritable.class);
+ 
+     graphJob.setInputFormat(TextInputFormat.class);
+ 
+     graphJob.setVertexInputReaderClass(GraphTextReader.class);
+     graphJob.setPartitioner(HashPartitioner.class);
+ 
+     graphJob.setOutputFormat(TextOutputFormat.class);
+     graphJob.setOutputKeyClass(Text.class);
+     graphJob.setOutputValueClass(IntWritable.class);
+ 
+     return graphJob;
+   }
+ }}}
+ 
+ The most important component is the '''GraphVertex''' class. This class is 
the heart of the program as contains the compute method.
+ 
+ {{{
+   public static class GraphVertex extends 
+       Vertex<Text, NullWritable, IntWritable> {
+     
+     private void createSumVertex() throws IOException {
+       if (this.getVertexID().toString().equals("1")) {
+         Text new_id = new Text("sum");
+         this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new 
IntWritable(0));
+       }
+     }
+ 
+     private void sendAllValuesToSumAndRemove() throws IOException {
+       if (!this.getVertexID().toString().equals("sum")) {
+         this.sendMessage(new Text("sum"), this.getValue());
+         this.remove();
+       }
+     }
+ 
+     // this must run only on "sum" vertex
+     private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getVertexID().toString().equals("sum")) {
+         int s = 0;
+         for (IntWritable i : msgs) {
+           s += i.get();
+         }
+         s += 
this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+         s += this.getNumVertices();
+         this.setValue(new IntWritable(this.getValue().get() +s));
+       } else {
+         throw new UnsupportedOperationException("We have more vertecies than 
we expected: " + this.getVertexID() + " " + this.getValue()); 
+       }
+     }
+ 
+     @Override
+     public void compute(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getSuperstepCount() == 0) {
+         createSumVertex();
+       } else if (this.getSuperstepCount() == 1) {
+         sendAllValuesToSumAndRemove();
+       } else if (this.getSuperstepCount() == 2) {
+         calculateSum(msgs);
+       } else if (this.getSuperstepCount() == 3) {
+         this.voteToHalt();
+       }
+     }
+   }
+ }}}
+ 
+ As we can see, the compute method is calling 3 other methods. In the 
'''createSumVertex''' method we can see the creation of a new vertex with ID 
the text ''sum'' and value 0. Later on, in '''sendAllValuesToSumAndRemove''' we 
can see that each vertex that runs this method is deleting itself by running 
''this.remove();''. In the end, '''calculateSum''' is called that summarizes 
the values of all vertices in the ''sum'' vertex. The interesting part of the 
last method is that also adding the number of input vertices 
''this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()'' and 
the current number of vertices that exist on the running superstep 
''this.getNumVertices()''.
  
  === Implementation Details ===
  The `GraphJobRunner` class, contains the major steps for the graph 
computation. It starts with an initial setup, where the vertexes are loaded to 
memory, then there is the main computation loop, where the supersteps occur, 
and in the end there is the cleanup, where of the graph and the results are 
written to HDFS.

Reply via email to