Author: jghoman
Date: Mon Nov 28 20:51:55 2011
New Revision: 1207593
URL: http://svn.apache.org/viewvc?rev=1207593&view=rev
Log:
GIRAPH-51. Provide unit testing tool for Giraph algorithms. Contributed by
Sebastian Schelter.
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/
incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Nov 28 20:51:55 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-51: Provide unit testing tool for Giraph algorithms.
+ (Sebastian Schelter via jghoman)
+
GIRAPH-89: Simplify boolean expressions in BspRecordReader.
(shaunak via claudio)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Mon Nov 28 20:51:55 2011
@@ -84,6 +84,13 @@ public class GiraphJob extends Job {
*/
public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+ /** Indicates whether this job is run in an internal unit test */
+ public static final String LOCAL_TEST_MODE =
+ "giraph.localTestMode";
+
+ /** not in local test mode per default */
+ public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
+
/**
* Minimum percent of the maximum number of workers that have responded
* in order to continue progressing. (float)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
Mon Nov 28 20:51:55 2011
@@ -365,32 +365,35 @@ public class GraphMapper<I extends Writa
// Do some initial setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
- Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
- String zkClasspath = null;
- if(fileClassPaths == null) {
- if(LOG.isInfoEnabled()) {
- LOG.info("Distributed cache is empty. Assuming fatjar.");
- }
- String jarFile = context.getJar();
- if (jarFile == null) {
- jarFile = findContainingJar(getClass());
- }
- zkClasspath = jarFile.replaceFirst("file:", "");
- } else {
- StringBuilder sb = new StringBuilder();
- sb.append(fileClassPaths[0]);
-
- for (int i = 1; i < fileClassPaths.length; i++) {
- sb.append(":");
- sb.append(fileClassPaths[i]);
+ if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
+ GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+ Path[] fileClassPaths =
DistributedCache.getLocalCacheArchives(conf);
+ String zkClasspath = null;
+ if(fileClassPaths == null) {
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Distributed cache is empty. Assuming fatjar.");
+ }
+ String jarFile = context.getJar();
+ if (jarFile == null) {
+ jarFile = findContainingJar(getClass());
+ }
+ zkClasspath = jarFile.replaceFirst("file:", "");
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(fileClassPaths[0]);
+
+ for (int i = 1; i < fileClassPaths.length; i++) {
+ sb.append(":");
+ sb.append(fileClassPaths[i]);
+ }
+ zkClasspath = sb.toString();
}
- zkClasspath = sb.toString();
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: classpath @ " + zkClasspath);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: classpath @ " + zkClasspath);
+ }
+ conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
}
- conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
String serverPortList =
conf.get(GiraphJob.ZOOKEEPER_LIST, "");
if (serverPortList == "") {
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1207593&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Mon Nov 28 20:51:55 2011
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a base class for running internal tests on a vertex
+ *
+ * Extending classes only have to invoke the run() method to test their
vertex. All data
+ * is written to a local tmp directory that is removed afterwards. A local
zookeeper
+ * instance is started in an extra thread and shutdown at the end.
+ *
+ * heavily inspired from Apache Mahout's MahoutTestCase
+ */
+public class InternalVertexRunner {
+
+ public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+
+ private InternalVertexRunner() {
+ }
+
+ /**
+ * Attempts to run the vertex internally in the current JVM, reading from
and writing to a
+ * temporary folder on local disk. Will start an own zookeeper instance.
+ *
+ * @param vertexClass the vertex class to instantiate
+ * @param vertexInputFormatClass the inputformat to use
+ * @param vertexOutputFormatClass the outputformat to use
+ * @param params a map of parameters to add to the hadoop configuration
+ * @param data linewise input data
+ * @return linewise output data
+ * @throws Exception
+ */
+ public static Iterable<String> run(Class<?> vertexClass,
+ Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+ Map<String,String> params, String... data) throws Exception {
+
+ File tmpDir = null;
+ try {
+ /* prepare input file, output folder and zookeeper folder */
+ tmpDir = createTestDir(vertexClass);
+ File inputFile = createTempFile(tmpDir, "graph.txt");
+ File outputDir = createTempDir(tmpDir, "output");
+ File zkDir = createTempDir(tmpDir, "zooKeeper");
+
+ /* write input data to disk */
+ writeLines(inputFile, data);
+
+ /* create and configure the job to run the vertex */
+ GiraphJob job = new GiraphJob(vertexClass.getName());
+ job.setVertexClass(vertexClass);
+ job.setVertexInputFormatClass(vertexInputFormatClass);
+ job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+ job.setWorkerConfiguration(1, 1, 100.0f);
+ Configuration conf = job.getConfiguration();
+ conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+ conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
+ conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
+ String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+ for (Map.Entry<String,String> param : params.entrySet()) {
+ conf.set(param.getKey(), param.getValue());
+ }
+
+ FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
+ FileOutputFormat.setOutputPath(job, new
Path(outputDir.toString()));
+
+ /* configure a local zookeeper instance */
+ Properties zkProperties = new Properties();
+ zkProperties.setProperty("tickTime", "2000");
+ zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+ zkProperties.setProperty("clientPort",
+ String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ zkProperties.setProperty("maxClientCnxns", "10000");
+ zkProperties.setProperty("minSessionTimeout", "10000");
+ zkProperties.setProperty("maxSessionTimeout", "100000");
+ zkProperties.setProperty("initLimit", "10");
+ zkProperties.setProperty("syncLimit", "5");
+ zkProperties.setProperty("snapCount", "50000");
+
+ QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+ qpConfig.parseProperties(zkProperties);
+
+ /* create and run the zookeeper instance */
+ final ZooKeeperServerMain zookeeper = new ZooKeeperServerMain();
+ final ServerConfig zkConfig = new ServerConfig();
+ zkConfig.readFrom(qpConfig);
+
+ ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zookeeper.runFromConfig(zkConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ try {
+ job.run(true);
+ } finally {
+ executorService.shutdown();
+ }
+
+ return Files.readLines(new File(outputDir, "part-m-00000"),
+ Charsets.UTF_8);
+ } finally {
+ if (tmpDir != null) {
+ new DeletingVisitor().accept(tmpDir);
+ }
+ }
+ }
+
+ /* create a temporary folder that will be removed after the test */
+ private static final File createTestDir(Class<?> vertexClass)
+ throws IOException {
+ String systemTmpDir = System.getProperty("java.io.tmpdir");
+ long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
+ File testTempDir = new File(systemTmpDir, "giraph-" +
+ vertexClass.getSimpleName() + '-' + simpleRandomLong);
+ if (!testTempDir.mkdir()) {
+ throw new IOException("Could not create " + testTempDir);
+ }
+ testTempDir.deleteOnExit();
+ return testTempDir;
+ }
+
+ private static final File createTempFile(File parent, String name)
+ throws IOException {
+ return createTestTempFileOrDir(parent, name, false);
+ }
+
+ private static final File createTempDir(File parent, String name)
+ throws IOException {
+ File dir = createTestTempFileOrDir(parent, name, true);
+ dir.delete();
+ return dir;
+ }
+
+ private static File createTestTempFileOrDir(File parent, String name,
+ boolean dir) throws IOException {
+ File f = new File(parent, name);
+ f.deleteOnExit();
+ if (dir && !f.mkdirs()) {
+ throw new IOException("Could not make directory " + f);
+ }
+ return f;
+ }
+
+ private static void writeLines(File file, String... lines)
+ throws IOException {
+ Writer writer = Files.newWriter(file, Charsets.UTF_8);
+ try {
+ for (String line : lines) {
+ writer.write(line);
+ writer.write('\n');
+ }
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+ }
+
+ private static class DeletingVisitor implements FileFilter {
+ @Override
+ public boolean accept(File f) {
+ if (!f.isFile()) {
+ f.listFiles(this);
+ }
+ f.delete();
+ return false;
+ }
+ }
+
+}
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java?rev=1207593&r1=1207592&r2=1207593&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
Mon Nov 28 20:51:55 2011
@@ -19,6 +19,7 @@
package org.apache.giraph.utils;
import java.lang.reflect.Array;
+import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@@ -122,4 +123,26 @@ public class ReflectionUtils {
}
return typeArgumentsAsClasses;
}
+
+ /** try to directly set a (possibly private) field on an Object */
+ public static void setField(Object target, String fieldname, Object value)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = findDeclaredField(target.getClass(), fieldname);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ /** find a declared field in a class or one of its super classes */
+ private static Field findDeclaredField(Class<?> inClass, String fieldname)
+ throws NoSuchFieldException {
+ while (!Object.class.equals(inClass)) {
+ for (Field field : inClass.getDeclaredFields()) {
+ if (field.getName().equalsIgnoreCase(fieldname)) {
+ return field;
+ }
+ }
+ inClass = inClass.getSuperclass();
+ }
+ throw new NoSuchFieldException();
+ }
}
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1207593&view=auto
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
(added)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
Mon Nov 28 20:51:55 2011
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.TestCase;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.mockito.Mockito;
+
+import java.util.Map;
+
+/** contains a simple unit test for {@link SimpleShortestPathsVertex} */
+public class SimpleShortestPathVertexTest extends TestCase {
+
+ /** test the behavior when a shorter path to a vertex has been found */
+ public void testOnShorterPathFound() throws Exception {
+
+ SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+ vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
+ vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
+
+ MockUtils.MockedEnvironment<LongWritable, DoubleWritable,
FloatWritable,
+ DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+ new LongWritable(7L), new DoubleWritable(Double.MAX_VALUE),
+ false);
+
+ Mockito.when(env.getConfiguration().getLong(
+ SimpleShortestPathsVertex.SOURCE_ID,
+ SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+ vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+ new DoubleWritable(1.5)).iterator());
+
+ assertTrue(vertex.isHalted());
+ assertEquals(1.5, vertex.getVertexValue().get());
+
+ env.verifyMessageSent(new LongWritable(10L), new DoubleWritable(4));
+ env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
+ }
+
+ /** test the behavior when a new, but not shorter path to a vertex has
been found */
+ public void testOnNoShorterPathFound() throws Exception {
+
+ SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
+ vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
+ vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
+
+ MockUtils.MockedEnvironment<LongWritable, DoubleWritable,
FloatWritable,
+ DoubleWritable> env = MockUtils.prepareVertex(vertex, 1L,
+ new LongWritable(7L), new DoubleWritable(0.5), false);
+
+ Mockito.when(env.getConfiguration().getLong(
+ SimpleShortestPathsVertex.SOURCE_ID,
+ SimpleShortestPathsVertex.SOURCE_ID_DEFAULT)).thenReturn(2L);
+
+ vertex.compute(Lists.newArrayList(new DoubleWritable(2),
+ new DoubleWritable(1.5)).iterator());
+
+ assertTrue(vertex.isHalted());
+ assertEquals(0.5, vertex.getVertexValue().get());
+
+ env.verifyNoMessageSent();
+ }
+
+ /** a local integration test on toy data */
+ public void testToyData() throws Exception {
+
+ /* a small four vertex graph */
+ String[] graph = new String[] {
+ "[1,0,[[2,1],[3,3]]]",
+ "[2,0,[[3,1],[4,10]]]",
+ "[3,0,[[4,2]]]",
+ "[4,0,[]]" };
+
+ /* start from vertex 1 */
+ Map<String,String> params = Maps.newHashMap();
+ params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
+
+ /* run internally */
+ Iterable<String> results = InternalVertexRunner.run(
+ SimpleShortestPathsVertex.class,
+ SimpleShortestPathsVertex.
+ SimpleShortestPathsVertexInputFormat.class,
+ SimpleShortestPathsVertex.
+ SimpleShortestPathsVertexOutputFormat.class,
+ params, graph);
+
+ Map<Long, Double> distances = parseDistances(results);
+
+ /* verify results */
+ assertNotNull(distances);
+ assertEquals(4, distances.size());
+ assertEquals(0.0, distances.get(1L));
+ assertEquals(1.0, distances.get(2L));
+ assertEquals(2.0, distances.get(3L));
+ assertEquals(4.0, distances.get(4L));
+ }
+
+ private Map<Long,Double> parseDistances(Iterable<String> results) {
+ Map<Long,Double> distances =
+ Maps.newHashMapWithExpectedSize(Iterables.size(results));
+ for (String line : results) {
+ try {
+ JSONArray jsonVertex = new JSONArray(line);
+ distances.put(jsonVertex.getLong(0), jsonVertex.getDouble(1));
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "Couldn't get vertex from line " + line, e);
+ }
+ }
+ return distances;
+ }
+}
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1207593&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
(added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
Mon Nov 28 20:51:55 2011
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.mockito.Mockito;
+
+/** simplify mocking for unit testing vertices */
+public class MockUtils {
+
+ private MockUtils() {
+ }
+
+ /**
+ * mocks and holds "environment objects" that are injected into a vertex
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+ public static class MockedEnvironment<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+
+ private final GraphState graphState;
+ private final Mapper.Context context;
+ private final Configuration conf;
+ private final WorkerCommunications communications;
+
+ public MockedEnvironment() {
+ graphState = Mockito.mock(GraphState.class);
+ context = Mockito.mock(Mapper.Context.class);
+ conf = Mockito.mock(Configuration.class);
+ communications = Mockito.mock(WorkerCommunications.class);
+ }
+
+ /** the injected graph state */
+ public GraphState getGraphState() {
+ return graphState;
+ }
+
+ /** the injected mapper context */
+ public Mapper.Context getContext() {
+ return context;
+ }
+
+ /** the injected hadoop configuration */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /** the injected worker communications */
+ public WorkerCommunications getCommunications() {
+ return communications;
+ }
+
+ /** assert that the test vertex message has been sent to a particular
vertex */
+ public void verifyMessageSent(I targetVertexId, M message) {
+ Mockito.verify(communications).sendMessageReq(targetVertexId,
+ message);
+ }
+
+ /** assert that the test vertex has sent no message to a particular
vertex */
+ public void verifyNoMessageSent() {
+ Mockito.verifyZeroInteractions(communications);
+ }
+ }
+
+ /**
+ * prepare a vertex for use in a unit test by setting its internal state
and injecting mocked
+ * dependencies,
+ *
+ * @param vertex
+ * @param superstep the superstep to emulate
+ * @param vertexId initial vertex id
+ * @param vertexValue initial vertex value
+ * @param isHalted initial halted state of the vertex
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ * @return
+ * @throws Exception
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ MockedEnvironment<I, V, E, M> prepareVertex(
+ Vertex<I, V, E, M> vertex, long superstep, I vertexId,
+ V vertexValue, boolean isHalted) throws Exception {
+
+ MockedEnvironment<I, V, E, M> env =
+ new MockedEnvironment<I, V, E, M>();
+
+ Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
+ Mockito.when(env.getGraphState().getContext())
+ .thenReturn(env.getContext());
+ Mockito.when(env.getContext().getConfiguration())
+ .thenReturn(env.getConfiguration());
+ Mockito.when(env.getGraphState().getWorkerCommunications())
+ .thenReturn(env.getCommunications());
+
+ ReflectionUtils.setField(vertex, "vertexId", vertexId);
+ ReflectionUtils.setField(vertex, "vertexValue", vertexValue);
+ ReflectionUtils.setField(vertex, "graphState", env.getGraphState());
+ ReflectionUtils.setField(vertex, "halt", isHalted);
+
+ return env;
+ }
+}