Author: jghoman
Date: Mon Nov 28 20:51:55 2011
New Revision: 1207593

URL: http://svn.apache.org/viewvc?rev=1207593&view=rev
Log:
GIRAPH-51. Provide unit testing tool for Giraph algorithms. Contributed by 
Sebastian Schelter.

Added:
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/
    
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/
    incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Nov 28 20:51:55 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-51: Provide unit testing tool for Giraph algorithms.
+  (Sebastian Schelter via jghoman)
+
   GIRAPH-89: Simplify boolean expressions in BspRecordReader.
   (shaunak via claudio)
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
Mon Nov 28 20:51:55 2011
@@ -84,6 +84,13 @@ public class GiraphJob extends Job {
      */
     public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
 
+    /** Indicates whether this job is run in an internal unit test */
+    public static final String LOCAL_TEST_MODE =
+        "giraph.localTestMode";
+
+    /** not in local test mode per default */
+    public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
+
     /**
      * Minimum percent of the maximum number of workers that have responded
      * in order to continue progressing. (float)

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
Mon Nov 28 20:51:55 2011
@@ -365,32 +365,35 @@ public class GraphMapper<I extends Writa
 
         // Do some initial setup (possibly starting up a Zookeeper service)
         context.setStatus("setup: Initializing Zookeeper services.");
-        Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
-        String zkClasspath = null;
-        if(fileClassPaths == null) {
-            if(LOG.isInfoEnabled()) {
-                LOG.info("Distributed cache is empty. Assuming fatjar.");
-            }
-            String jarFile = context.getJar();
-            if (jarFile == null) {
-               jarFile = findContainingJar(getClass());
-            }
-            zkClasspath = jarFile.replaceFirst("file:", "");
-        } else {
-            StringBuilder sb = new StringBuilder();
-            sb.append(fileClassPaths[0]);
-
-            for (int i = 1; i < fileClassPaths.length; i++) {
-                sb.append(":");
-                sb.append(fileClassPaths[i]);
+        if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
+                GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+            Path[] fileClassPaths = 
DistributedCache.getLocalCacheArchives(conf);
+            String zkClasspath = null;
+            if(fileClassPaths == null) {
+                if(LOG.isInfoEnabled()) {
+                    LOG.info("Distributed cache is empty. Assuming fatjar.");
+                }
+                String jarFile = context.getJar();
+                if (jarFile == null) {
+                   jarFile = findContainingJar(getClass());
+                }
+                zkClasspath = jarFile.replaceFirst("file:", "");
+            } else {
+                StringBuilder sb = new StringBuilder();
+                sb.append(fileClassPaths[0]);
+
+                for (int i = 1; i < fileClassPaths.length; i++) {
+                    sb.append(":");
+                    sb.append(fileClassPaths[i]);
+                }
+                zkClasspath = sb.toString();
             }
-            zkClasspath = sb.toString();
-        }
 
-        if (LOG.isInfoEnabled()) {
-            LOG.info("setup: classpath @ " + zkClasspath);
+            if (LOG.isInfoEnabled()) {
+                LOG.info("setup: classpath @ " + zkClasspath);
+            }
+            conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
         }
-        conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
         String serverPortList =
             conf.get(GiraphJob.ZOOKEEPER_LIST, "");
         if (serverPortList == "") {

Added: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1207593&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 (added)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 Mon Nov 28 20:51:55 2011
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a base class for running internal tests on a vertex
+ *
+ * Extending classes only have to invoke the run() method to test their 
vertex. All data
+ * is written to a local tmp directory that is removed afterwards. A local 
zookeeper
+ * instance is started in an extra thread and shutdown at the end.
+ *
+ * heavily inspired from Apache Mahout's MahoutTestCase
+ */
+public class InternalVertexRunner {
+
+    public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+
+    private InternalVertexRunner() {
+    }
+
+    /**
+     *  Attempts to run the vertex internally in the current JVM, reading from 
and writing to a
+     *  temporary folder on local disk. Will start an own zookeeper instance.
+     *
+     * @param vertexClass the vertex class to instantiate
+     * @param vertexInputFormatClass the inputformat to use
+     * @param vertexOutputFormatClass the outputformat to use
+     * @param params a map of parameters to add to the hadoop configuration
+     * @param data linewise input data
+     * @return linewise output data
+     * @throws Exception
+     */
+    public static Iterable<String> run(Class<?> vertexClass,
+            Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+            Map<String,String> params, String... data) throws Exception {
+
+        File tmpDir = null;
+        try {
+            /* prepare input file, output folder and zookeeper folder */
+            tmpDir = createTestDir(vertexClass);
+            File inputFile = createTempFile(tmpDir, "graph.txt");
+            File outputDir = createTempDir(tmpDir, "output");
+            File zkDir = createTempDir(tmpDir, "zooKeeper");
+
+            /* write input data to disk */
+            writeLines(inputFile, data);
+
+            /* create and configure the job to run the vertex */
+            GiraphJob job = new GiraphJob(vertexClass.getName());
+            job.setVertexClass(vertexClass);
+            job.setVertexInputFormatClass(vertexInputFormatClass);
+            job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+            job.setWorkerConfiguration(1, 1, 100.0f);
+            Configuration conf = job.getConfiguration();
+            conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+            conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
+            conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
+                    String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+            for (Map.Entry<String,String> param : params.entrySet()) {
+                conf.set(param.getKey(), param.getValue());
+            }
+
+            FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
+            FileOutputFormat.setOutputPath(job, new 
Path(outputDir.toString()));
+
+            /* configure a local zookeeper instance */
+            Properties zkProperties = new Properties();
+            zkProperties.setProperty("tickTime", "2000");
+            zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+            zkProperties.setProperty("clientPort",
+                    String.valueOf(LOCAL_ZOOKEEPER_PORT));
+            zkProperties.setProperty("maxClientCnxns", "10000");
+            zkProperties.setProperty("minSessionTimeout", "10000");
+            zkProperties.setProperty("maxSessionTimeout", "100000");
+            zkProperties.setProperty("initLimit", "10");
+            zkProperties.setProperty("syncLimit", "5");
+            zkProperties.setProperty("snapCount", "50000");
+
+            QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+            qpConfig.parseProperties(zkProperties);
+
+            /* create and run the zookeeper instance */
+            final ZooKeeperServerMain zookeeper = new ZooKeeperServerMain();
+            final ServerConfig zkConfig = new ServerConfig();
+            zkConfig.readFrom(qpConfig);
+
+            ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        zookeeper.runFromConfig(zkConfig);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+            try {
+                job.run(true);
+            } finally {
+                executorService.shutdown();
+            }
+
+            return Files.readLines(new File(outputDir, "part-m-00000"),
+                    Charsets.UTF_8);
+        } finally {
+            if (tmpDir != null) {
+                new DeletingVisitor().accept(tmpDir);
+            }
+        }
+    }
+
+    /* create a temporary folder that will be removed after the test */
+    private static final File createTestDir(Class<?> vertexClass)
+            throws IOException {
+        String systemTmpDir = System.getProperty("java.io.tmpdir");
+        long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
+        File testTempDir = new File(systemTmpDir, "giraph-" +
+                vertexClass.getSimpleName() + '-' + simpleRandomLong);
+        if (!testTempDir.mkdir()) {
+            throw new IOException("Could not create " + testTempDir);
+        }
+        testTempDir.deleteOnExit();
+        return testTempDir;
+    }
+
+    private static final File createTempFile(File parent, String name)
+            throws IOException {
+        return createTestTempFileOrDir(parent, name, false);
+    }
+
+    private static final File createTempDir(File parent, String name)
+            throws IOException {
+        File dir = createTestTempFileOrDir(parent, name, true);
+        dir.delete();
+        return dir;
+    }
+
+    private static File createTestTempFileOrDir(File parent, String name,
+            boolean dir) throws IOException {
+        File f = new File(parent, name);
+        f.deleteOnExit();
+        if (dir && !f.mkdirs()) {
+            throw new IOException("Could not make directory " + f);
+        }
+        return f;
+    }
+
+    private static void writeLines(File file, String... lines)
+            throws IOException {
+        Writer writer = Files.newWriter(file, Charsets.UTF_8);
+        try {
+            for (String line : lines) {
+                writer.write(line);
+                writer.write('\n');
+            }
+        } finally {
+            Closeables.closeQuietly(writer);
+        }
+    }
+
+    private static class DeletingVisitor implements FileFilter {
+        @Override
+        public boolean accept(File f) {
+            if (!f.isFile()) {
+                f.listFiles(this);
+            }
+            f.delete();
+            return false;
+        }
+    }
+
+}

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
 Mon Nov 28 20:51:55 2011
@@ -19,6 +19,7 @@
 package org.apache.giraph.utils;
 
 import java.lang.reflect.Array;
+import java.lang.reflect.Field;
 import java.lang.reflect.GenericArrayType;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -122,4 +123,26 @@ public class ReflectionUtils {
         }
         return typeArgumentsAsClasses;
     }
+
+    /** try to directly set a (possibly private) field on an Object */
+    public static void setField(Object target, String fieldname, Object value)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field field = findDeclaredField(target.getClass(), fieldname);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+
+    /** find a declared field in a class or one of its super classes */
+    private static Field findDeclaredField(Class<?> inClass, String fieldname)
+            throws NoSuchFieldException {
+        while (!Object.class.equals(inClass)) {
+            for (Field field : inClass.getDeclaredFields()) {
+                if (field.getName().equalsIgnoreCase(fieldname)) {
+                    return field;
+                }
+            }
+            inClass = inClass.getSuperclass();
+        }
+        throw new NoSuchFieldException();
+    }
 }

Added: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1207593&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
 (added)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
 Mon Nov 28 20:51:55 2011
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.TestCase;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.mockito.Mockito;
+
+import java.util.Map;
+
+/** contains a simple unit test for {@link SimpleShortestPathsVertex} */
+public class SimpleShortestPathVertexTest extends TestCase {
+
+    /** test the behavior when a shorter path to a vertex has been found */
+    public void testOnShorterPathFound() throws Exception {
+
+        SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+        vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
+        vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
+
+        MockUtils.MockedEnvironment<LongWritable, DoubleWritable, 
FloatWritable,
+                DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+                new LongWritable(7L), new DoubleWritable(Double.MAX_VALUE),
+                false);
+
+        Mockito.when(env.getConfiguration().getLong(
+                SimpleShortestPathsVertex.SOURCE_ID,
+                SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+        vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+                new DoubleWritable(1.5)).iterator());
+
+        assertTrue(vertex.isHalted());
+        assertEquals(1.5, vertex.getVertexValue().get());
+
+        env.verifyMessageSent(new LongWritable(10L), new DoubleWritable(4));
+        env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
+    }
+
+    /** test the behavior when a new, but not shorter path to a vertex has 
been found */
+    public void testOnNoShorterPathFound() throws Exception {
+
+        SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+        vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
+        vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
+
+        MockUtils.MockedEnvironment<LongWritable, DoubleWritable, 
FloatWritable,
+                DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+                new LongWritable(7L), new DoubleWritable(0.5), false);
+
+        Mockito.when(env.getConfiguration().getLong(
+                SimpleShortestPathsVertex.SOURCE_ID,
+                SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+        vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+                new DoubleWritable(1.5)).iterator());
+
+        assertTrue(vertex.isHalted());
+        assertEquals(0.5, vertex.getVertexValue().get());
+
+        env.verifyNoMessageSent();
+    }
+
+    /** a local integration test on toy data */
+    public void testToyData() throws Exception {
+
+        /* a small four vertex graph */
+        String[] graph = new String[] {
+                "[1,0,[[2,1],[3,3]]]",
+                "[2,0,[[3,1],[4,10]]]",
+                "[3,0,[[4,2]]]",
+                "[4,0,[]]" };
+
+        /* start from vertex 1 */
+        Map<String,String> params = Maps.newHashMap();
+        params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
+
+        /* run internally */
+        Iterable<String> results = InternalVertexRunner.run(
+                SimpleShortestPathsVertex.class,
+                SimpleShortestPathsVertex.
+                        SimpleShortestPathsVertexInputFormat.class,
+                SimpleShortestPathsVertex.
+                        SimpleShortestPathsVertexOutputFormat.class,
+                params, graph);
+
+        Map<Long, Double> distances = parseDistances(results);
+
+        /* verify results */
+        assertNotNull(distances);
+        assertEquals(4, distances.size());
+        assertEquals(0.0, distances.get(1L));
+        assertEquals(1.0, distances.get(2L));
+        assertEquals(2.0, distances.get(3L));
+        assertEquals(4.0, distances.get(4L));
+    }
+
+    private Map<Long,Double> parseDistances(Iterable<String> results) {
+        Map<Long,Double> distances =
+                Maps.newHashMapWithExpectedSize(Iterables.size(results));
+        for (String line : results) {
+            try {
+                JSONArray jsonVertex = new JSONArray(line);
+                distances.put(jsonVertex.getLong(0), jsonVertex.getDouble(1));
+            } catch (JSONException e) {
+                throw new IllegalArgumentException(
+                    "Couldn't get vertex from line " + line, e);
+            }
+        }
+        return distances;
+    }
+}

Added: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1207593&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 
(added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 
Mon Nov 28 20:51:55 2011
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.mockito.Mockito;
+
+/** simplify mocking for unit testing vertices */
+public class MockUtils {
+
+    private MockUtils() {
+    }
+
+    /**
+     * mocks and holds  "environment objects" that are injected into a vertex
+     *
+     * @param <I> vertex id
+     * @param <V> vertex data
+     * @param <E> edge data
+     * @param <M> message data
+     */
+    public static class MockedEnvironment<I extends WritableComparable,
+            V extends Writable, E extends Writable, M extends Writable> {
+
+        private final GraphState graphState;
+        private final Mapper.Context context;
+        private final Configuration conf;
+        private final WorkerCommunications communications;
+
+        public MockedEnvironment() {
+            graphState = Mockito.mock(GraphState.class);
+            context = Mockito.mock(Mapper.Context.class);
+            conf = Mockito.mock(Configuration.class);
+            communications = Mockito.mock(WorkerCommunications.class);
+        }
+
+        /** the injected graph state */
+        public GraphState getGraphState() {
+            return graphState;
+        }
+
+        /** the injected mapper context  */
+        public Mapper.Context getContext() {
+            return context;
+        }
+
+        /** the injected hadoop configuration */
+        public Configuration getConfiguration() {
+            return conf;
+        }
+
+        /** the injected worker communications */
+        public WorkerCommunications getCommunications() {
+            return communications;
+        }
+
+        /** assert that the test vertex message has been sent to a particular 
vertex */
+        public void verifyMessageSent(I targetVertexId, M message) {
+            Mockito.verify(communications).sendMessageReq(targetVertexId,
+                    message);
+        }
+
+        /** assert that the test vertex has sent no message to a particular 
vertex */
+        public void verifyNoMessageSent() {
+            Mockito.verifyZeroInteractions(communications);
+        }
+    }
+
+    /**
+     * prepare a vertex for use in a unit test by setting its internal state 
and injecting mocked
+     * dependencies,
+     *
+     * @param vertex
+     * @param superstep the superstep to emulate
+     * @param vertexId initial vertex id
+     * @param vertexValue initial vertex value
+     * @param isHalted initial halted state of the vertex
+     * @param <I> vertex id
+     * @param <V> vertex data
+     * @param <E> edge data
+     * @param <M> message data
+     * @return
+     * @throws Exception
+     */
+    public static <I extends WritableComparable, V extends Writable,
+            E extends Writable, M extends Writable>
+            MockedEnvironment<I, V, E, M> prepareVertex(
+            Vertex<I, V, E, M> vertex, long superstep, I vertexId,
+            V vertexValue, boolean isHalted) throws Exception {
+
+        MockedEnvironment<I, V, E, M>  env =
+                new MockedEnvironment<I, V, E, M>();
+
+        Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
+        Mockito.when(env.getGraphState().getContext())
+                .thenReturn(env.getContext());
+        Mockito.when(env.getContext().getConfiguration())
+                .thenReturn(env.getConfiguration());
+        Mockito.when(env.getGraphState().getWorkerCommunications())
+                .thenReturn(env.getCommunications());
+
+        ReflectionUtils.setField(vertex, "vertexId", vertexId);
+        ReflectionUtils.setField(vertex, "vertexValue", vertexValue);
+        ReflectionUtils.setField(vertex, "graphState", env.getGraphState());
+        ReflectionUtils.setField(vertex, "halt", isHalted);
+
+        return env;
+    }
+}


Reply via email to