Author: jwills
Date: Mon Jan 20 20:08:12 2014
New Revision: 1559821

URL: http://svn.apache.org/r1559821
Log:
Add section on unit testing

Modified:
    crunch/site/trunk/content/user-guide.mdtext

Modified: crunch/site/trunk/content/user-guide.mdtext
URL: 
http://svn.apache.org/viewvc/crunch/site/trunk/content/user-guide.mdtext?rev=1559821&r1=1559820&r2=1559821&view=diff
==============================================================================
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Jan 20 20:08:12 2014
@@ -61,6 +61,7 @@ Notice:   Licensed to the Apache Softwar
     1. [MRPipeline](#mrpipeline)
     1. [SparkPipeline](#sparkpipeline)
     1. [MemPipeline](#mempipeline)
+1. [Unit Testing Pipelines](#testing)
 
 <a name="intro"></a>
 ## Introduction to Crunch
@@ -1494,3 +1495,105 @@ on the read side. Often the best way to 
 `materialize()` method to get a reference to the contents of the in-memory 
collection and then verify them directly,
 without writing them out to disk.
 
+<a name="testing"></a>
+## Unit Testing Pipelines
+
+For production data pipelines, unit tests are an absolute must. The 
[MemPipeline](#mempipeline) implementation of the Pipeline
+interface has several tools to help developers create effective unit tests, 
which will be detailed in this section.
+
+### Unit Testing DoFns
+
+Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very 
easy to test, since they accept a single input
+and return a single output. For general purpose DoFns, we need an instance of 
the [Emitter](apidocs/0.9.0/org/apache/crunch/Emitter.html)
+interface that we can pass to the DoFn's `process` method and then read in the 
values that are written by the function. Support
+for this pattern is provided by the 
[InMemoryEmitter](apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html)
 class, which
+has a `List<T> getOutput()` method that can be used to read the values that 
were passed to the Emitter instance by a DoFn instance:
+
+       @Test
+       public void testToUpperCaseFn() {
+         InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
+         new ToUpperCaseFn().process("input", emitter);
+         assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
+       }
+
+
+### Testing Complex DoFns and Pipelines
+
+Many of the DoFns we write involve more complex processing that require that 
our DoFn be initialized and cleaned up, or that
+define Counters that we use to track the inputs that we receive. In order to 
ensure that our DoFns are working properly across
+their entire lifecycle, it's best to use the [MemPipeline](#mempipeline) 
implementation to create in-memory instances of
+PCollections and PTables that contain a small amount of test data and apply 
our DoFns to those PCollections to test their
+functionality. We can easily retrieve the contents of any in-memory 
PCollection by calling its `Iterable<T> materialize()`
+method, which will return immediately. We can also track the values of any 
Counters that were called as the DoFns were
+executed against the test data by calling the static `getCounters()` method on 
the MemPipeline instance, and reset
+those Counters between test runs by calling the static `clearCounters()` 
method:
+
+       public static class UpperCaseWithCounterFn extends DoFn<String, String> 
{
+         @Override
+         public void process(String input, Emitter<T> emitter) {
+           String upper = input.toUpperCase();
+           if (!upper.equals(input)) {
+             increment("UpperCase", "modified");
+           }
+           emitter.emit(upper);
+         }
+       }
+       
+       @Before
+       public void setUp() throws Exception {
+         MemPipeline.clearCounters();
+       }
+       
+       @Test
+       public void testToUpperCase_WithPipeline() {
+         PCollection<String> inputStrings = MemPipeline.collectionOf("a", "B", 
"c");
+         PCollection<String> upperCaseStrings = inputStrings.parallelDo(new 
ToUpperCaseFn(), Writables.strings());
+         assertEquals(ImmutableList.of("A", "B", "C"), 
Lists.newArrayList(upperCaseStrings.materialize()));
+         assertEquals(2L, MemPipeline.getCounters().findCounter("UpperCase", 
"modified").getValue());
+       }
+
+### Designing Testable Data Pipelines
+
+In the same way that we try to [write testable 
code](http://misko.hevery.com/code-reviewers-guide/), we want to ensure that
+our data pipelines are written in a way that makes them easy to test. In 
general, you should try to break up complex pipelines
+into a number of function calls that perform a small set of operations on 
input PCollections and return one or more PCollections
+as a result. This makes it easy to swap in different PCollection 
implementations for testing and production runs.
+
+Let's look at an example that computes one iteration of the 
[PageRank](http://en.wikipedia.org/wiki/PageRank) algorithm that
+is taken from one of Crunch's integration tests:
+
+       // Each entry in the PTable represents a URL and its associated data 
for PageRank computations.
+       public static PTable<String, PageRankData> pageRank(PTable<String, 
PageRankData> input, final float d) {
+         PTypeFamily ptf = input.getTypeFamily();
+       
+         // Compute the outbound page rank from each of the input pages.
+         PTable<String, Float> outbound = input.parallelDo(new 
DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+           @Override
+            public void process(Pair<String, PageRankData> input, 
Emitter<Pair<String, Float>> emitter) {
+            PageRankData prd = input.second();
+             for (String link : prd.urls) {
+               emitter.emit(Pair.of(link, prd.propagatedScore()));
+             }
+           }
+         }, ptf.tableOf(ptf.strings(), ptf.floats()));
+       
+         // Update the PageRank for each URL.
+         return input.cogroup(outbound).mapValues(
+             new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, 
PageRankData>() {
+               @Override
+               public PageRankData map(Pair<Collection<PageRankData>, 
Collection<Float>> input) {
+                 PageRankData prd = Iterables.getOnlyElement(input.first());
+                 Collection<Float> propagatedScores = input.second();
+                 float sum = 0.0f;
+                 for (Float s : propagatedScores) {
+                   sum += s;
+                 }
+                 return prd.next(d + (1.0f - d) * sum);
+               }
+             }, input.getValueType());
+       }
+
+By embedding our business logic inside of a static method that operates on 
PTables, we can easily unit test our PageRank
+computations that combine custom DoFns with Crunch's built-in `cogroup` 
operation by using the [MemPipeline](#mempipeline)
+implementation to create test data sets that we can easily verify by hand, and 
then this same logic can be executed on
+a distributed data set using either the [MRPipeline](#mrpipeline) or 
[SparkPipeline](#sparkpipeline) implementations.


Reply via email to