Author: tommaso
Date: Sat Sep 21 06:44:05 2013
New Revision: 1525198
URL: http://svn.apache.org/r1525198
Log:
HAMA-732 - applied patch for integration with DM for OffHeapVerticesInfo
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1525198&view=auto
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
(added)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Sat Sep 21 06:44:05 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.directmemory.DirectMemory;
+import org.apache.directmemory.cache.CacheService;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.directmemory.serialization.Serializer;
+import org.apache.directmemory.serialization.kryo.KryoSerializer;
+import org.apache.directmemory.utils.CacheValuesIterable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
+ */
+public class OffHeapVerticesInfo<V extends WritableComparable<?>, E extends
Writable, M extends Writable>
+ implements VerticesInfo<V, E, M> {
+
+ public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+ public static final String DM_BUFFERS = "dm.buffers";
+ public static final String DM_SIZE = "dm.size";
+ public static final String DM_CAPACITY = "dm.capacity";
+ public static final String DM_CONCURRENCY = "dm.concurrency";
+ public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+ public static final String DM_SERIALIZER = "dm.serializer";
+ public static final String DM_SORTED = "dm.sorted";
+
+ private CacheService<V, Vertex<V, E, M>> vertices;
+
+ private boolean strict;
+ private GraphJobRunner<V, E, M> runner;
+
+ @Override
+ public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
TaskAttemptID attempt) throws IOException {
+ this.runner = runner;
+ this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+ DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V, E,
M>>()
+ .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
+ .setSize(conf.getInt(DM_SIZE, 102400))
+
.setSerializer(ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
KryoSerializer.class, Serializer.class)))
+ .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
+ if (conf.getBoolean(DM_SORTED, true)) {
+ dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E,
M>>>());
+ } else {
+ dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+ .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+ }
+
+ this.vertices = dm.newCacheService();
+
+ }
+
+ @Override
+ public void cleanup(Configuration conf, TaskAttemptID attempt) throws
IOException {
+ vertices.dump();
+ }
+
+ public void addVertex(Vertex<V, E, M> vertex) {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ @Override
+ public void finishAdditions() {
+ }
+
+ @Override
+ public void startSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishVertexComputation(Vertex<V, E, M> vertex) throws
IOException {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ public void clear() {
+ vertices.clear();
+ }
+
+ public int size() {
+ return (int) this.vertices.entries();
+ }
+
+ @Override
+ public IDSkippingIterator<V, E, M> skippingIterator() {
+ final Iterator<Vertex<V, E, M>> vertexIterator =
+ new CacheValuesIterable<V, Vertex<V, E, M>>(vertices,
strict).iterator();
+
+ return new IDSkippingIterator<V, E, M>() {
+ int currentIndex = 0;
+
+ Vertex<V, E, M> currentVertex = null;
+
+ @Override
+ public boolean hasNext(V e,
+
org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+ if (currentIndex < vertices.entries()) {
+
+ Vertex<V, E, M> next = vertexIterator.next();
+ while (!strat.accept(next, e)) {
+ currentIndex++;
+ }
+ currentVertex = next;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Vertex<V, E, M> next() {
+ currentIndex++;
+ if (currentVertex.getRunner() == null) {
+ currentVertex.setRunner(runner);
+ }
+ return currentVertex;
+ }
+
+ };
+
+ }
+
+ @Override
+ public void removeVertex(V vertexID) {
+ throw new UnsupportedOperationException ("Not yet implemented");
+ }
+
+ @Override
+ public void finishRemovals() {
+ throw new UnsupportedOperationException ("Not yet implemented");
+ }
+
+}
Added:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1525198&view=auto
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
(added)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Sat Sep 21 06:44:05 2013
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapVerticesInfo {
+
+ @Test
+ public void testOffHeapVerticesInfoLifeCycle() throws Exception {
+ OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new
OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+ Configuration conf = new Configuration();
+ conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
+ conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
+ NullWritable.class.getName());
+ conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName());
+ conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName());
+ GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf);
+ TaskAttemptID attempt = new TaskAttemptID("123", 1, 1, 0);
+ try {
+ ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>();
+
+ for (int i = 0; i < 10; i++) {
+ PageRankVertex v = new PageRankVertex();
+ v.setVertexID(new Text(i + ""));
+ if (i % 2 == 0) {
+ v.setValue(new DoubleWritable(i * 2));
+ }
+ v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null));
+
+ list.add(v);
+ }
+
+ info.init(null, conf, attempt);
+ for (PageRankVertex v : list) {
+ info.addVertex(v);
+ }
+
+ info.finishAdditions();
+
+ assertEquals(10, info.size());
+ // no we want to iterate and check if the result can properly be obtained
+
+ int index = 0;
+ IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
+ .skippingIterator();
+ while (iterator.hasNext()) {
+ Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+ PageRankVertex pageRankVertex = list.get(index);
+ assertEquals(pageRankVertex.getVertexID().toString(), next
+ .getVertexID().toString());
+ if (index % 2 == 0) {
+ assertEquals((int) next.getValue().get(), index * 2);
+ } else {
+ assertNull(next.getValue());
+ }
+ assertEquals(next.isHalted(), false);
+ // check edges
+ List<Edge<Text, NullWritable>> edges = next.getEdges();
+ assertEquals(1, edges.size());
+ Edge<Text, NullWritable> edge = edges.get(0);
+ assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+ .toString(), edge.getDestinationVertexID().toString());
+ assertNull(edge.getValue());
+
+ index++;
+ }
+ assertEquals(index, list.size());
+ info.finishSuperstep();
+ // iterate again and compute so vertices change internally
+ iterator = info.skippingIterator();
+ info.startSuperstep();
+ while (iterator.hasNext()) {
+ Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+ // override everything with constant 2
+ next.setValue(new DoubleWritable(2));
+ if (Integer.parseInt(next.getVertexID().toString()) == 3) {
+ next.voteToHalt();
+ }
+ info.finishVertexComputation(next);
+ }
+ info.finishSuperstep();
+ assertEquals(index, list.size());
+
+ } finally {
+ info.cleanup(conf, attempt);
+ }
+
+ }
+
+ @Test
+ public void testAdditionWithDefaults() throws Exception {
+ OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
+ new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+ Configuration conf = new Configuration();
+ verticesInfo.init(null, conf, null);
+ Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
+ vertex.setVertexID(new Text("some-id"));
+ verticesInfo.addVertex(vertex);
+ assertTrue("added vertex could not be found in the cache",
verticesInfo.skippingIterator().hasNext());
+ }
+
+ @Test
+ public void testMassiveAdditionWithDefaults() throws Exception {
+ OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
+ new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+ Configuration conf = new Configuration();
+ verticesInfo.init(null, conf, null);
+ assertEquals("vertices info size should be 0 at startup", 0,
verticesInfo.size());
+ Random r = new Random();
+ int i = 10000;
+ for (int n = 0; n < i; n++) {
+ Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
+ vertex.setVertexID(new Text(String.valueOf(r.nextInt())));
+ vertex.setValue(new DoubleWritable(r.nextDouble()));
+ verticesInfo.addVertex(vertex);
+ }
+ verticesInfo.finishAdditions();
+ assertEquals("vertices info size is not correct", i, verticesInfo.size());
+ }
+
+}