Author: jwills
Date: Mon Aug 4 17:50:15 2014
New Revision: 1615711
URL: http://svn.apache.org/r1615711
Log:
CRUNCH-453: User guide updates
Modified:
crunch/site/trunk/content/user-guide.mdtext
Modified: crunch/site/trunk/content/user-guide.mdtext
URL:
http://svn.apache.org/viewvc/crunch/site/trunk/content/user-guide.mdtext?rev=1615711&r1=1615710&r2=1615711&view=diff
==============================================================================
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Aug 4 17:50:15 2014
@@ -45,7 +45,7 @@ Notice: Licensed to the Apache Softwar
1. [Bloom Filter Joins](#bloomjoin)
1. [Cogroups](#cogroups)
1. [Sorting](#sorting)
- 1. [Standard and Reveserse Sorting](#stdsort)
+ 1. [Standard and Reverse Sorting](#stdsort)
1. [Secondary Sorts](#secsort)
1. [Other Operations](#otheropts)
1. [Cartesian Products](#cartesian)
@@ -169,40 +169,40 @@ into more detail about their usage in th
<a name="datamodel"></a>
### Data Model and Operators
-Crunch's Java API is centered around three interfaces that represent
distributed datasets:
[PCollection<T>](apidocs/0.9.0/org/apache/crunch/PCollection.html),
-[PTable<K,
V>](http://crunch.apache.org/apidocs/0.9.0/org/apache/crunch/PTable.html), and
[PGroupedTable<K, V>](apidocs/0.9.0/org/apache/crunch/PGroupedTable.html).
+Crunch's Java API is centered around three interfaces that represent
distributed datasets:
[PCollection<T>](apidocs/0.10.0/org/apache/crunch/PCollection.html),
+[PTable<K,
V>](http://crunch.apache.org/apidocs/0.10.0/org/apache/crunch/PTable.html), and
[PGroupedTable<K, V>](apidocs/0.10.0/org/apache/crunch/PGroupedTable.html).
A `PCollection<T>` represents a distributed, immutable collection of elements
of type T. For example, we represent a text file as a
-`PCollection<String>` object. `PCollection<T>` provides a method,
*parallelDo*, that applies a [DoFn<T,
U>](apidocs/0.9.0/org/apache/crunch/DoFn.html)
+`PCollection<String>` object. `PCollection<T>` provides a method,
*parallelDo*, that applies a [DoFn<T,
U>](apidocs/0.10.0/org/apache/crunch/DoFn.html)
to each element in the `PCollection<T>` in parallel, and returns a new
`PCollection<U>` as its result.
A `PTable<K, V>` is a sub-interface of `PCollection<Pair<K, V>>` that
represents a distributed, unordered multimap of its key type K to its value
type V.
In addition to the parallelDo operation, PTable provides a *groupByKey*
operation that aggregates all of the values in the PTable that
have the same key into a single record. It is the groupByKey operation that
triggers the sort phase of a MapReduce job. Developers can exercise
fine-grained control over the number of reducers and the partitioning,
grouping, and sorting strategies used during the shuffle by providing an
instance
-of the [GroupingOptions](apidocs/0.9.0/org/apache/crunch/GroupingOptions.html)
class to the `groupByKey` function.
+of the
[GroupingOptions](apidocs/0.10.0/org/apache/crunch/GroupingOptions.html) class
to the `groupByKey` function.
The result of a groupByKey operation is a `PGroupedTable<K, V>` object, which
is a distributed, sorted map of keys of type K to an Iterable<V> that may
be iterated over exactly once. In addition to `parallelDo` processing via
DoFns, PGroupedTable provides a *combineValues* operation that allows a
-commutative and associative
[Aggregator<V>](apidocs/0.9.0/org/apache/crunch/Aggregator.html) to be applied
to the values of the PGroupedTable
+commutative and associative
[Aggregator<V>](apidocs/0.10.0/org/apache/crunch/Aggregator.html) to be applied
to the values of the PGroupedTable
instance on both the map and reduce sides of the shuffle. A number of common
`Aggregator<V>` implementations are provided in the
-[Aggregators](apidocs/0.9.0/org/apache/crunch/fn/Aggregators.html) class.
+[Aggregators](apidocs/0.10.0/org/apache/crunch/fn/Aggregators.html) class.
Finally, PCollection, PTable, and PGroupedTable all support a *union*
operation, which takes a series of distinct PCollections that all have
the same data type and treats them as a single virtual PCollection.
All of the other data transformation operations supported by the Crunch APIs
(aggregations, joins, sorts, secondary sorts, and cogrouping) are implemented
-in terms of these four primitives. The patterns themselves are defined in the
[org.apache.crunch.lib](apidocs/0.9.0/org/apache/crunch/lib/package-summary.html)
+in terms of these four primitives. The patterns themselves are defined in the
[org.apache.crunch.lib](apidocs/0.10.0/org/apache/crunch/lib/package-summary.html)
package and its children, and a few of of the most common patterns have
convenience functions defined on the PCollection and PTable interfaces.
-Every Crunch data pipeline is coordinated by an instance of the
[Pipeline](apidocs/0.9.0/org/apache/crunch/Pipeline.html) interface, which
defines
-methods for reading data into a pipeline via
[Source<T>](apidocs/0.9.0/org/apache/crunch/Source.html) instances and writing
data out from a
-pipeline to [Target](apidocs/0.9.0/org/apache/crunch/Target.html) instances.
There are currently three implementations of the Pipeline interface
+Every Crunch data pipeline is coordinated by an instance of the
[Pipeline](apidocs/0.10.0/org/apache/crunch/Pipeline.html) interface, which
defines
+methods for reading data into a pipeline via
[Source<T>](apidocs/0.10.0/org/apache/crunch/Source.html) instances and writing
data out from a
+pipeline to [Target](apidocs/0.10.0/org/apache/crunch/Target.html) instances.
There are currently three implementations of the Pipeline interface
that are available for developers to use:
-1. [MRPipeline](apidocs/0.9.0/org/apache/crunch/impl/mr/MRPipeline.html):
Executes the pipeline as a series of MapReduce jobs.
-2. [MemPipeline](apidocs/0.9.0/org/apache/crunch/impl/mem/MemPipeline.html):
Executes the pipeline in-memory on the client.
-3.
[SparkPipeline](apidocs/0.9.0/org/apache/crunch/impl/spark/SparkPipeline.html):
Executes the pipeline by converting it to a series of Spark pipelines.
+1. [MRPipeline](apidocs/0.10.0/org/apache/crunch/impl/mr/MRPipeline.html):
Executes the pipeline as a series of MapReduce jobs.
+2. [MemPipeline](apidocs/0.10.0/org/apache/crunch/impl/mem/MemPipeline.html):
Executes the pipeline in-memory on the client.
+3.
[SparkPipeline](apidocs/0.10.0/org/apache/crunch/impl/spark/SparkPipeline.html):
Executes the pipeline by converting it to a series of Spark pipelines.
<a name="dataproc"></a>
## Data Processing with DoFns
@@ -346,7 +346,7 @@ framework won't kill it,
DoFns also have a number of helper methods for working with [Hadoop
Counters](http://codingwiththomas.blogspot.com/2011/04/controlling-hadoop-job-recursion.html),
all named `increment`. Counters are an incredibly useful way of keeping track
of the state of long-running data pipelines and detecting any exceptional
conditions that
occur during processing, and they are supported in both the MapReduce-based
and in-memory Crunch pipeline contexts. You can retrieve the value of the
Counters
-in your client code at the end of a MapReduce pipeline by getting them from
the
[StageResult](apidocs/0.9.0/org/apache/crunch/PipelineResult.StageResult.html)
+in your client code at the end of a MapReduce pipeline by getting them from
the
[StageResult](apidocs/0.10.0/org/apache/crunch/PipelineResult.StageResult.html)
objects returned by Crunch at the end of a run.
* `increment(String groupName, String counterName)` increments the value of
the given counter by 1.
@@ -378,18 +378,18 @@ memory setting for the DoFn's needs befo
### Common DoFn Patterns
The Crunch APIs contain a number of useful subclasses of DoFn that handle
common data processing scenarios and are easier
-to write and test. The top-level
[org.apache.crunch](apidocs/0.9.0/org/apache/crunch/package-summary.html)
package contains three
+to write and test. The top-level
[org.apache.crunch](apidocs/0.10.0/org/apache/crunch/package-summary.html)
package contains three
of the most important specializations, which we will discuss now. Each of
these specialized DoFn implementations has associated methods
on the PCollection, PTable, and PGroupedTable interfaces to support common
data processing steps.
-The simplest extension is the
[FilterFn<T>](apidocs/0.9.0/org/apache/crunch/FilterFn.html) class, which
defines a single abstract method, `boolean accept(T input)`.
+The simplest extension is the
[FilterFn<T>](apidocs/0.10.0/org/apache/crunch/FilterFn.html) class, which
defines a single abstract method, `boolean accept(T input)`.
The FilterFn can be applied to a `PCollection<T>` by calling the
`filter(FilterFn<T> fn)` method, and will return a new `PCollection<T>` that
only contains
the elements of the input PCollection for which the accept method returned
true. Note that the filter function does not include a PType argument in its
signature, because there is no change in the data type of the PCollection when
the FilterFn is applied. It is possible to compose new FilterFn
instances by combining multiple FilterFns together using the `and`, `or`, and
`not` factory methods defined in the
-[FilterFns](apidocs/0.9.0/org/apache/crunch/fn/FilterFns.html) helper class.
+[FilterFns](apidocs/0.10.0/org/apache/crunch/fn/FilterFns.html) helper class.
-The second extension is the [MapFn<S,
T>](apidocs/0.9.0/org/apache/crunch/MapFn.html) class, which defines a single
abstract method, `T map(S input)`.
+The second extension is the [MapFn<S,
T>](apidocs/0.10.0/org/apache/crunch/MapFn.html) class, which defines a single
abstract method, `T map(S input)`.
For simple transform tasks in which every input record will have exactly one
output, it's easy to test a MapFn by verifying that a given input returns a
given output.
@@ -400,7 +400,7 @@ the key be given and constructs a `PTabl
has methods `PTable<K1, V> mapKeys(MapFn<K, K1> mapFn)` and `PTable<K, V2>
mapValues(MapFn<V, V2>)` that handle the common case of converting
just one of the paired values in a PTable instance from one type to another
while leaving the other type the same.
-The final top-level extension to DoFn is the [CombineFn<K,
V>](apidocs/0.9.0/org/apache/crunch/CombineFn.html) class, which is used in
conjunction with
+The final top-level extension to DoFn is the [CombineFn<K,
V>](apidocs/0.10.0/org/apache/crunch/CombineFn.html) class, which is used in
conjunction with
the `combineValues` method defined on the PGroupedTable interface. CombineFns
are used to represent the associative operations that can be applied using
the MapReduce Combiner concept in order to reduce the amount data that is
shipped over the network during a shuffle.
@@ -408,9 +408,9 @@ The CombineFn extension is different fro
beyond the default `process` method that any other DoFn would use; rather,
extending the CombineFn class signals to the Crunch planner that the logic
contained in this class satisfies the conditions required for use with the
MapReduce combiner.
-Crunch supports many types of these associative patterns, such as sums,
counts, and set unions, via the
[Aggregator<V>](apidocs/0.9.0/org/apache/crunch/Aggregator.html)
+Crunch supports many types of these associative patterns, such as sums,
counts, and set unions, via the
[Aggregator<V>](apidocs/0.10.0/org/apache/crunch/Aggregator.html)
interface, which is defined right alongside the CombineFn class in the
top-level `org.apache.crunch` package. There are a number of implementations of
the Aggregator
-interface defined via static factory methods in the
[Aggregators](apidocs/0.9.0/org/apache/crunch/fn/Aggregators.html) class. We
will discuss
+interface defined via static factory methods in the
[Aggregators](apidocs/0.10.0/org/apache/crunch/fn/Aggregators.html) class. We
will discuss
Aggregators more in the section on [common MapReduce patterns](#aggregators).
<a name="serde"></a>
@@ -434,11 +434,11 @@ against an existing PCollection, **the r
}
</pre>
-Crunch supports two different _type families_, which each implement the
[PTypeFamily](apidocs/0.9.0/org/apache/crunch/types/PTypeFamily.html) interface:
-one for Hadoop's [Writable
interface](apidocs/0.9.0/org/apache/crunch/types/writable/WritableTypeFamily.html)
and another based on
-[Apache Avro](apidocs/0.9.0/org/apache/crunch/types/avro/AvroTypeFamily.html).
There are also classes that contain static factory methods for
-each PTypeFamily to allow for easy import and usage: one for
[Writables](apidocs/0.9.0/org/apache/crunch/types/writable/Writables.html) and
one for
-[Avros](apidocs/0.9.0/org/apache/crunch/types/avro/Avros.html).
+Crunch supports two different _type families_, which each implement the
[PTypeFamily](apidocs/0.10.0/org/apache/crunch/types/PTypeFamily.html)
interface:
+one for Hadoop's [Writable
interface](apidocs/0.10.0/org/apache/crunch/types/writable/WritableTypeFamily.html)
and another based on
+[Apache
Avro](apidocs/0.10.0/org/apache/crunch/types/avro/AvroTypeFamily.html). There
are also classes that contain static factory methods for
+each PTypeFamily to allow for easy import and usage: one for
[Writables](apidocs/0.10.0/org/apache/crunch/types/writable/Writables.html) and
one for
+[Avros](apidocs/0.10.0/org/apache/crunch/types/avro/Avros.html).
The two different type families exist for historical reasons: Writables have
long been the standard form for representing serializable data in Hadoop,
but the Avro based serialization scheme is very compact, fast, and allows for
complex record schemas to evolve over time. It's fine (and even encouraged)
@@ -479,7 +479,7 @@ can be used to kick off a shuffle on the
</pre>
If you find yourself in a situation where you have a PCollection<Pair<K, V>>
and you need a PTable<K, V>, the
-[PTables](apidocs/0.9.0/org/apache/crunch/lib/PTables.html) library class has
methods that will do the conversion for you.
+[PTables](apidocs/0.10.0/org/apache/crunch/lib/PTables.html) library class has
methods that will do the conversion for you.
Let's look at some more example PTypes created using the common primitive and
collection types. For most of your pipelines,
you will use one type family exclusively, and so you can cut down on some of
the boilerplate in your classes by importing
@@ -550,7 +550,7 @@ includes both Avro generic and specific
PType<Record> avroGenericType = Avros.generics(schema);
</pre>
-The [Avros](apidocs/0.9.0/org/apache/crunch/types/avro/Avros.html) class also
has a `reflects` method for creating PTypes
+The [Avros](apidocs/0.10.0/org/apache/crunch/types/avro/Avros.html) class also
has a `reflects` method for creating PTypes
for POJOs using Avro's reflection-based serialization mechanism. There are a
couple of restrictions on the structure of
the POJO:
@@ -589,7 +589,7 @@ to query intermediate results to aid in
The simplest way to create a new `PType<T>` for a data object is to create a
_derived_ PType from one of the built-in PTypes from the Avro
and Writable type families. If we have a base `PType<S>`, we can create a
derived `PType<T>` by implementing an input `MapFn<S, T>` and an
output `MapFn<T, S>` and then calling `PTypeFamily.derived(Class<T>, MapFn<S,
T> in, MapFn<T, S> out, PType<S> base)`, which will return
-a new `PType<T>`. There are examples of derived PTypes in the
[PTypes](apidocs/0.9.0/org/apache/crunch/types/PTypes.html) class, including
+a new `PType<T>`. There are examples of derived PTypes in the
[PTypes](apidocs/0.10.0/org/apache/crunch/types/PTypes.html) class, including
serialization support for protocol buffers, Thrift records, Java Enums,
BigInteger, and UUIDs. The [crunch
module](https://github.com/kevinweil/elephant-bird/tree/master/crunch) of
[Twitter's ElephantBird](https://github.com/kevinweil/elephant-bird/) project
also defines PTypes for working with
protocol buffers and Thrift records that are serialized using ElephantBird's
`BinaryWritable<T>`.
@@ -660,7 +660,7 @@ You use a Source in conjunction with one
Writables.tableOf(Writables.longs(), Writables.bytes())));
</pre>
-Note that Sources usually require a PType to be specified when they are
created. The [From](apidocs/0.9.0/org/apache/crunch/io/From.html)
+Note that Sources usually require a PType to be specified when they are
created. The [From](apidocs/0.10.0/org/apache/crunch/io/From.html)
class provides a number of factory methods for literate Source creation:
<pre>
@@ -701,28 +701,28 @@ Here is a table of commonly used Sources
</tr>
<tr>
<td>Text</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/text/TextFileSource.html">org.apache.crunch.io.text.TextFileSource</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/text/TextFileSource.html">org.apache.crunch.io.text.TextFileSource</a></td>
<td>PCollection<String></td>
<td>textFile</td>
<td>Works for both TextInputFormat and AvroUtf8InputFormat</td>
</tr>
<tr>
<td>Sequence</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/seq/SeqFileTableSource.html">org.apache.crunch.io.seq.SeqFileTableSource</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/seq/SeqFileTableSource.html">org.apache.crunch.io.seq.SeqFileTableSource</a></td>
<td>PTable<K, V></td>
<td>sequenceFile</td>
- <td>Also has a <a
href="apidocs/0.9.0/org/apache/crunch/io/seq/SeqFileSource.html">SeqFileSource</a>
which reads the value and ignores the key.</td>
+ <td>Also has a <a
href="apidocs/0.10.0/org/apache/crunch/io/seq/SeqFileSource.html">SeqFileSource</a>
which reads the value and ignores the key.</td>
</tr>
<tr>
<td>Avro</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/avro/AvroFileSource.html">org.apache.crunch.io.avro.AvroFileSource</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/avro/AvroFileSource.html">org.apache.crunch.io.avro.AvroFileSource</a></td>
<td>PCollection<V></td>
<td>avroFile</td>
<td>No PTable analogue for Avro records.</td>
</tr>
<tr>
<td>Parquet</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/parquet/AvroParquetFileSource.html">org.apache.crunch.io.parquet.AvroParquetFileSource</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/parquet/AvroParquetFileSource.html">org.apache.crunch.io.parquet.AvroParquetFileSource</a></td>
<td>PCollection<V></td>
<td>N/A</td>
<td>Reads Avro records from a parquet-formatted file; expects an Avro
PType.</td>
@@ -750,7 +750,7 @@ The PCollection interface also declares
</pre>
Just as the Source interface has the `From` class of factory methods, Target
factory methods are defined in a class named
-[To](apidocs/0.9.0/org/apache/crunch/io/To.html) to enable literate
programming:
+[To](apidocs/0.10.0/org/apache/crunch/io/To.html) to enable literate
programming:
<pre>
lines.write(To.textFile("/user/crunch/textout"));
@@ -784,25 +784,25 @@ Here is a table of commonly used Targets
</tr>
<tr>
<td>Text</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/text/TextFileTarget.html">org.apache.crunch.io.text.TextFileTarget</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/text/TextFileTarget.html">org.apache.crunch.io.text.TextFileTarget</a></td>
<td>textFile</td>
<td>Will write out the string version of whatever it's given, which should
be text. See also: Pipeline.writeTextFile.</td>
</tr>
<tr>
<td>Sequence</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/seq/SeqFileTarget.html">org.apache.crunch.io.seq.SeqFileTarget</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/seq/SeqFileTarget.html">org.apache.crunch.io.seq.SeqFileTarget</a></td>
<td>sequenceFile</td>
<td>Works on both PCollection and PTable.</td>
</tr>
<tr>
<td>Avro</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/avro/AvroFileTarget.html">org.apache.crunch.io.avro.AvroFileTarget</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/avro/AvroFileTarget.html">org.apache.crunch.io.avro.AvroFileTarget</a></td>
<td>avroFile</td>
<td>Treats PTables as PCollections of Pairs.</td>
</tr>
<tr>
<td>Parquet</td>
- <td><a
href="apidocs/0.9.0/org/apache/crunch/io/parquet/AvroParquetFileTarget.html">org.apache.crunch.io.parquet.AvroParquetFileTarget</a></td>
+ <td><a
href="apidocs/0.10.0/org/apache/crunch/io/parquet/AvroParquetFileTarget.html">org.apache.crunch.io.parquet.AvroParquetFileTarget</a></td>
<td>N/A</td>
<td>Writes Avro records to parquet-formatted files; expects an Avro
PType.</td>
</tr>
@@ -814,7 +814,7 @@ Here is a table of commonly used Targets
The `SourceTarget<T>` interface extends both the `Source<T>` and `Target`
interfaces and allows a Path to act as both a
Target for some PCollections as well as a Source for others. SourceTargets are
convenient for any intermediate outputs within
your pipeline. Just as we have the factory methods in the From and To classes
for Sources and Targets, factory methods for
-SourceTargets are declared in the
[At](apidocs/0.9.0/org/apache/crunch/io/At.html) class.
+SourceTargets are declared in the
[At](apidocs/0.10.0/org/apache/crunch/io/At.html) class.
In many pipeline applications, we want to control how any existing files in
our target paths are handled by Crunch. For example,
we might want the pipeline to fail quickly if an output path already exists,
or we might want to delete the existing files
@@ -822,7 +822,7 @@ and overwrite them with our new outputs.
Checkpoints allow us to specify that a Path should be used as the starting
location for our pipeline execution if the data
it contains is newer than the data in the paths associated with any upstream
inputs to that output location.
-Crunch supports these different output options via the
[WriteMode](apidocs/0.9.0/org/apache/crunch/Target.WriteMode.html) enum,
+Crunch supports these different output options via the
[WriteMode](apidocs/0.10.0/org/apache/crunch/Target.WriteMode.html) enum,
which can be passed along with a Target to the `write` method on either
PCollection or Pipeline. Here are the supported
WriteModes for Crunch:
@@ -864,13 +864,13 @@ one of the `run` methods on the Pipeline
Crunch to materialize multiple PCollections and have them all created within a
single Pipeline run.
If you ask Crunch to materialize a PCollection that is returned from
Pipeline's `PCollection<T> read(Source<T> source)` method, then no
-MapReduce job will be executed if the given Source implements the
[ReadableSource](apidocs/0.9.0/org/apache/crunch/io/ReadableSource.html)
+MapReduce job will be executed if the given Source implements the
[ReadableSource](apidocs/0.10.0/org/apache/crunch/io/ReadableSource.html)
interface. If the Source is not readable, then a map-only job will be executed
to map the data to a format that Crunch knows how to
read from disk.
Sometimes, the output of a Crunch pipeline will be a single value, such as the
number of elements in a PCollection. In other instances,
you may want to perform some additional client-side computations on the
materialized contents of a PCollection in a way that is
-transparent to users of your libraries. For these situations, Crunch defines a
[PObject<V>](apidocs/0.9.0/org/apache/crunch/PObject.html)
+transparent to users of your libraries. For these situations, Crunch defines a
[PObject<V>](apidocs/0.10.0/org/apache/crunch/PObject.html)
interface that has an associated `V getValue()` method. PCollection's
`PObject<Long> length()` method returns a reference to the number
of elements contained in that PCollection, but the pipeline tasks required to
compute this value will not run until the `Long getValue()`
method of the returned PObject is called.
@@ -879,7 +879,7 @@ method of the returned PObject is called
## Data Processing Patterns in Crunch
This section describes the various data processing patterns implemented in
Crunch's library APIs,
-which are in the
[org.apache.crunch.lib](apidocs/0.9.0/org/apache/crunch/lib/package-summary.html)
+which are in the
[org.apache.crunch.lib](apidocs/0.10.0/org/apache/crunch/lib/package-summary.html)
package.
<a name="gbk"></a>
@@ -896,7 +896,7 @@ explicitly provided by the developer bas
3. `groupByKey(GroupingOptions options)`: Complex shuffle operations that
require custom partitions
and comparators.
-The [GroupingOptions](apidocs/0.9.0/org/apache/crunch/GroupingOptions.html)
class allows developers
+The [GroupingOptions](apidocs/0.10.0/org/apache/crunch/GroupingOptions.html)
class allows developers
to exercise precise control over how data is partitioned, sorted, and grouped
by the underlying
execution engine. Crunch was originally developed on top of MapReduce, and so
the GroupingOptions APIs
expect instances of Hadoop's
[Partitioner](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Partitioner.html)
@@ -905,7 +905,7 @@ classes in order to support partitions a
same classes may also be used with other execution engines, like Apache Spark,
without a rewrite.
The GroupingOptions class is immutable; to create a new one, take advantage of
the
-[GroupingOptions.Builder](apidocs/0.9.0/org/apache/crunch/GroupingOptions.Builder.html)
implementation.
+[GroupingOptions.Builder](apidocs/0.10.0/org/apache/crunch/GroupingOptions.Builder.html)
implementation.
<pre>
GroupingOptions opts = GroupingOptions.builder()
@@ -931,10 +931,10 @@ Calling one of the groupByKey methods on
PGroupedTable provides a `combineValues` that can be used to signal to the
planner that we want to perform
associative aggregations on our data both before and after the shuffle.
-There are two ways to use combineValues: you can create an extension of the
[CombineFn](apidocs/0.9.0/org/apache/crunch/CombineFn.html)
-abstract base class, or you can use an instance of the
[Aggregator<V>](apidocs/0.9.0/org/apache/crunch/Aggregator.html)
+There are two ways to use combineValues: you can create an extension of the
[CombineFn](apidocs/0.10.0/org/apache/crunch/CombineFn.html)
+abstract base class, or you can use an instance of the
[Aggregator<V>](apidocs/0.10.0/org/apache/crunch/Aggregator.html)
interface. Of the two, an Aggregator is probably the way you want to go;
Crunch provides a number of
-[Aggregators](0.9.0/org/apache/crunch/fn/Aggregators.html), and they are a bit
easier to write and compose together.
+[Aggregators](0.10.0/org/apache/crunch/fn/Aggregators.html), and they are a
bit easier to write and compose together.
Let's walk through a few example aggregations:
<pre>
@@ -978,7 +978,7 @@ the average of a set of values:
Many of the most common aggregation patterns in Crunch are provided as methods
on the PCollection
interface, including `count`, `max`, `min`, and `length`. The implementations
of these methods,
-however, are in the
[Aggregate](apidocs/0.9.0/org/apache/crunch/lib/Aggregate.html) library class.
+however, are in the
[Aggregate](apidocs/0.10.0/org/apache/crunch/lib/Aggregate.html) library class.
The methods in the Aggregate class expose some additional options that you can
use for performing
aggregations, such as controlling the level of parallelism for count
operations:
@@ -1002,9 +1002,9 @@ most frequently occuring elements, you w
### Joining Data
Joins in Crunch are based on equal-valued keys in different PTables. Joins
have also evolved
-a great deal in Crunch over the lifetime of the project. The
[Join](apidocs/0.9.0/org/apache/crunch/lib/Join.html)
+a great deal in Crunch over the lifetime of the project. The
[Join](apidocs/0.10.0/org/apache/crunch/lib/Join.html)
API provides simple methods for performing equijoins, left joins, right joins,
and full joins, but modern
-Crunch joins are usually performed using an explicit implementation of the
[JoinStrategy](apidocs/0.9.0/org/apache/crunch/lib/join/JoinStrategy.html)
+Crunch joins are usually performed using an explicit implementation of the
[JoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/JoinStrategy.html)
interface, which has support for the same rich set of joins that you can use
in tools like Apache Hive and
Apache Pig.
@@ -1017,16 +1017,20 @@ All of the algorithms discussed below im
PTable<K, Pair<V1, V2>> joined = strategy.join(one, two,
JoinType);
</pre>
-The [JoinType](apidocs/0.9.0/org/apache/crunch/lib/join/JoinType.html) enum
determines which
+The [JoinType](apidocs/0.10.0/org/apache/crunch/lib/join/JoinType.html) enum
determines which
kind of join is applied: inner, outer, left, right, or full. In general, the
smaller of the two
-inputs should be the left-most argument to the join method. The only exception
to this (for unfortunate
-historical reasons that the Crunch developers deeply apologize for) is for
mapside-joins, where the
-left-most argument should be the *larger* input.
+inputs should be the left-most argument to the join method.
+
+Note that the values of the PTables you join should be non-null. The join
+algorithms in Crunch use null as a placeholder to represent that there are no
values for
+a given key in a PCollection, so joining PTables that contain null values may
have
+surprising results. Using a non-null dummy value in your PCollections is a
good idea in
+general.
<a name="reducejoin"></a>
#### Reduce-side Joins
-Reduce-side joins are handled by the
[DefaultJoinStrategy](apidocs/0.9.0/org/apache/crunch/lib/join/DefaultJoinStrategy.html).
+Reduce-side joins are handled by the
[DefaultJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/DefaultJoinStrategy.html).
Reduce-side joins are the simplest and most robust kind of joins in Hadoop;
the keys from the two inputs are
shuffled together to the reducers, where the values from the smaller of the
two collections are collected and then
streamed over the values from the larger of the two collections. You can
control the number of reducers that is used
@@ -1035,18 +1039,25 @@ to perform the join by passing an intege
<a name="mapjoin"></a>
#### Map-side Joins
-Map-side joins are handled by the
[MapsideJoinStrategy](apidocs/0.9.0/org/apache/crunch/lib/join/MapsideJoinStrategy.html).
+Map-side joins are handled by the
[MapsideJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/MapsideJoinStrategy.html).
Map-side joins require that the smaller of the two input tables is loaded into
memory on the tasks on the cluster, so
there is a requirement that at least one of the tables be relatively small so
that it can comfortably fit into memory within
-each task. *Remember, the MapsideJoinStrategy is the only JoinStrategy
implementation where the left-most argument should
-be larger than the right-most one.*
+each task.
+
+For a long time, the MapsideJoinStrategy differed from the rest of the
JoinStrategy
+implementations in that the left-most argument was intended to be larger than
the right-side
+one, since the right-side PTable was loaded into memory. Since Crunch
0.10.0/0.8.3, we
+have deprecated the old MapsideJoinStrategy constructor which had the sizes
reversed and
+recommend that you use the `MapsideJoinStrategy.create()` factory method,
which returns an
+implementation of the MapsideJoinStrategy in which the left-side PTable is
loaded into
+memory instead of the right-side PTable.
<a name="shardedjoin"></a>
#### Sharded Joins
Many distributed joins have skewed data that can cause regular reduce-side
joins to fail due to out-of-memory issues on
the partitions that happen to contain the keys with highest cardinality. To
handle these skew issues, Crunch has the
-[ShardedJoinStrategy](apidocs/0.9.0/org/apache/crunch/lib/join/ShardedJoinStrategy.html)
that allows developers to shard
+[ShardedJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/ShardedJoinStrategy.html)
that allows developers to shard
each key to multiple reducers, which prevents a few reducers from getting
overloaded with the values from the skewed keys
in exchange for sending more data over the wire. For problems with significant
skew issues, the ShardedJoinStrategy can
significantly improve performance.
@@ -1054,7 +1065,7 @@ significantly improve performance.
<a name="bloomjoin"></a>
#### Bloom Filter Joins
-Last but not least, the
[BloomFilterJoinStrategy](apidocs/0.9.0/org/apache/crunch/lib/join/BloomFilterJoinStrategy.html)
builds
+Last but not least, the
[BloomFilterJoinStrategy](apidocs/0.10.0/org/apache/crunch/lib/join/BloomFilterJoinStrategy.html)
builds
a [bloom filter](http://en.wikipedia.org/wiki/Bloom_filter) on the left-hand
side table that is used to filter the contents
of the right-hand side table to eliminate entries from the (larger) right-hand
side table that have no hope of being joined
to values in the left-hand side table. This is useful in situations in which
the left-hand side table is too large to fit
@@ -1068,7 +1079,7 @@ Some kinds of joins are richer and more
For example, we might want to join two datasets
together and only emit a record if each of the sets had at least two distinct
values associated
with each key. For arbitrary complex join logic, we can always fall back to the
-[Cogroup](apidocs/0.9.0/org/apache/crunch/lib/Cogroup.html) API, which takes
in an arbitrary number
+[Cogroup](apidocs/0.10.0/org/apache/crunch/lib/Cogroup.html) API, which takes
in an arbitrary number
of PTable instances that all have the same key type and combines them together
into a single
PTable whose values are made up of Collections of the values from each of the
input PTables.
@@ -1099,7 +1110,7 @@ more advanced patterns like secondary so
<a name="stdsort"></a>
#### Standard and Reverse Sorting
-The [Sort](apidocs/0.9.0/org/apache/crunch/lib/Sort.html) API methods contain
utility functions
+The [Sort](apidocs/0.10.0/org/apache/crunch/lib/Sort.html) API methods contain
utility functions
for sorting the contents of PCollections and PTables whose contents implement
the `Comparable`
interface. By default, MapReduce does not perform total sorts on its keys
during a shuffle; instead
a sort is done locally on each of the partitions of the data that are sent to
each reducer. Doing
@@ -1122,7 +1133,7 @@ total order partitioner and sorting cont
For more complex PCollections or PTables that are made up of Tuples (Pairs,
Tuple3, etc.), we can
specify which columns of the Tuple should be used for sorting the contents,
and in which order, using
-the [ColumnOrder](apidocs/0.9.0/org/apache/crunch/lib/Sort.ColumnOrder.html)
class:
+the [ColumnOrder](apidocs/0.10.0/org/apache/crunch/lib/Sort.ColumnOrder.html)
class:
<pre>
PTable<String, Long> table = ...;
@@ -1135,7 +1146,7 @@ the [ColumnOrder](apidocs/0.9.0/org/apac
Another pattern that occurs frequently in distributed processing is _secondary
sorts_, where we
want to group a set of records by one key and sort the records within each
group by a second key.
-The [SecondarySort](apidocs/0.9.0/org/apache/crunch/lib/SecondarySort.html)
API provides a set
+The [SecondarySort](apidocs/0.10.0/org/apache/crunch/lib/SecondarySort.html)
API provides a set
of `sortAndApply` methods that can be used on input PTables of the form
`PTable<K, Pair<K2, V>>`,
where `K` is the primary grouping key and `K2` is the secondary grouping key.
The `sortAndApply`
method will perform the grouping and sorting and will then apply a given DoFn
to process the
@@ -1155,7 +1166,7 @@ one of the datasets to be small enough t
set where we emit an element of the smaller data set along with each element
from the larger set.
When this pattern isn't possible but we still need to take the cartesian
product, we have some options,
-but they're fairly expensive. Crunch's
[Cartesian](apidocs/0.9.0/org/apache/crunch/lib/Cartesian.html) API
+but they're fairly expensive. Crunch's
[Cartesian](apidocs/0.10.0/org/apache/crunch/lib/Cartesian.html) API
provides methods for a reduce-side full cross product between two PCollections
(or PTables.) Note that
this is a pretty expensive operation, and you should go out of your way to
avoid these kinds of processing
steps in your pipelines.
@@ -1165,7 +1176,7 @@ steps in your pipelines.
Many MapReduce jobs have the potential to generate a large number of small
files that could be used more
effectively by clients if they were all merged together into a small number of
large files. The
-[Shard](apidocs/0.9.0/org/apache/crunch/lib/Shard.html) API provides a single
method, `shard`, that allows
+[Shard](apidocs/0.10.0/org/apache/crunch/lib/Shard.html) API provides a single
method, `shard`, that allows
you to coalesce a given PCollection into a fixed number of partitions:
<pre>
@@ -1179,7 +1190,7 @@ partitions. This is often a useful step
<a name="distinct"></a>
#### Distinct
-Crunch's [Distinct](apidocs/0.9.0/org/apache/crunch/lib/Distinct.html) API has
a method, `distinct`, that
+Crunch's [Distinct](apidocs/0.10.0/org/apache/crunch/lib/Distinct.html) API
has a method, `distinct`, that
returns one copy of each unique element in a given PCollection:
<pre>
@@ -1205,7 +1216,7 @@ thus the amount of memory they consume)
<a name="sampling"></a>
#### Sampling
-The [Sample](apidocs/0.9.0/org/apache/crunch/lib/Sample.html) API provides
methods for two sorts of PCollection
+The [Sample](apidocs/0.10.0/org/apache/crunch/lib/Sample.html) API provides
methods for two sorts of PCollection
sampling: random and reservoir.
Random sampling is where you include each record in the same with a fixed
probability, and is probably what you're
@@ -1237,7 +1248,7 @@ only require a single pass over the data
<a name="sets"></a>
#### Set Operations
-The [Set](apidocs/0.9.0/org/apache/crunch/lib/Set.html) API methods complement
Crunch's built-in `union` methods and
+The [Set](apidocs/0.10.0/org/apache/crunch/lib/Set.html) API methods
complement Crunch's built-in `union` methods and
provide support for finding the intersection, the difference, or the
[comm](http://en.wikipedia.org/wiki/Comm) of two PCollections.
<a name="splits"></a>
@@ -1245,7 +1256,7 @@ provide support for finding the intersec
Sometimes, you want to write two different outputs from the same DoFn into
different PCollections. An example of this would
be a pipeline in which you wanted to write good records to one file and bad or
corrupted records to a different file for
-further examination. The
[Channels](apidocs/0.9.0/org/apache/crunch/lib/Channels.html) class provides a
method that allows
+further examination. The
[Channels](apidocs/0.10.0/org/apache/crunch/lib/Channels.html) class provides a
method that allows
you to split an input PCollection of Pairs into a Pair of PCollections:
<pre>
@@ -1323,18 +1334,18 @@ flexible schemas for PCollections and PT
`Put`, `KeyValue`, and `Result`.
Be sure that the version of Crunch that you're using is compatible with the
version of HBase that you are running. The 0.8.x
-Crunch versions and earlier ones are developed against HBase 0.94.x, while
version 0.9.0 and after are developed against
+Crunch versions and earlier ones are developed against HBase 0.94.x, while
version 0.10.0 and after are developed against
HBase 0.96. There were a small number of backwards-incompatible changes made
between HBase 0.94 and 0.96 that are reflected
in the Crunch APIs for working with HBase. The most important of these is that
in HBase 0.96, HBase's `Put`, `KeyValue`, and `Result`
-classes no longer implement the Writable interface. To support working with
these types in Crunch 0.9.0, we added the
-[HBaseTypes](apidocs/0.9.0/org/apache/crunch/io/hbase/HBaseTypes.html) class
that has factory methods for creating PTypes that serialize the HBase client
classes to bytes so
+classes no longer implement the Writable interface. To support working with
these types in Crunch 0.10.0, we added the
+[HBaseTypes](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseTypes.html) class
that has factory methods for creating PTypes that serialize the HBase client
classes to bytes so
that they can still be used as part of MapReduce pipelines.
-Crunch supports working with HBase data in two ways. The
[HBaseSourceTarget](apidocs/0.9.0/org/apache/crunch/io/hbase/HBaseSourceTarget.html)
and [HBaseTarget](apidocs/0.9.0/org/apache/crunch/io/hbase/HBaseTarget.html)
classes support reading and
-writing data to HBase tables directly. The
[HFileSource](apidocs/0.9.0/org/apache/crunch/io/hbase/HFileSource.html) and
[HFileTarget](apidocs/0.9.0/org/apache/crunch/io/hbase/HFileTarget.html)
classes support reading and writing data
+Crunch supports working with HBase data in two ways. The
[HBaseSourceTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseSourceTarget.html)
and [HBaseTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HBaseTarget.html)
classes support reading and
+writing data to HBase tables directly. The
[HFileSource](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileSource.html) and
[HFileTarget](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileTarget.html)
classes support reading and writing data
to hfiles, which are the underlying file format for HBase. HFileSource and
HFileTarget can be used to read and write data to
hfiles directly, which is much faster than going through the HBase APIs and
can be used to perform efficient bulk loading of data
-into HBase tables. See the utility methods in the
[HFileUtils](apidocs/0.9.0/org/apache/crunch/io/hbase/HFileUtils.html) class for
+into HBase tables. See the utility methods in the
[HFileUtils](apidocs/0.10.0/org/apache/crunch/io/hbase/HFileUtils.html) class
for
more details on how to work with PCollections against hfiles.
<a name="exec"></a>
@@ -1343,16 +1354,16 @@ more details on how to work with PCollec
Crunch uses a lazy execution model. No jobs are run or outputs created until
the user explicitly invokes one of the methods on the
Pipeline interface that controls job planning and execution. The simplest of
these methods is the `PipelineResult run()` method,
which analyzes the current graph of PCollections and Target outputs and comes
up with a plan to ensure that each of the outputs is
-created and then executes it, returning only when the jobs are completed. The
[PipelineResult](apidocs/0.9.0/org/apache/crunch/PipelineResult.html)
+created and then executes it, returning only when the jobs are completed. The
[PipelineResult](apidocs/0.10.0/org/apache/crunch/PipelineResult.html)
returned by the `run` method contains information about what was run,
including the number of jobs that were executed during the
-pipeline run and the values of the Hadoop Counters for each of those stages
via the
[StageResult](apidocs/0.9.0/org/apache/crunch/PipelineResult.StageResult.html)
component classes.
+pipeline run and the values of the Hadoop Counters for each of those stages
via the
[StageResult](apidocs/0.10.0/org/apache/crunch/PipelineResult.StageResult.html)
component classes.
The last method that should be called in _any_ Crunch pipeline run is the
Pipeline interface's `PipelineResult done()` method. The done method will
ensure that any remaining outputs that have not yet been created are executed
via the `run`, and it will clean up the temporary directories that
Crunch creates during runs to hold serialized job information and intermediate
outputs.
Crunch also allows developers to execute finer-grained control over pipeline
execution via Pipeline's `PipelineExecution runAsync()` method.
-The `runAsync` method is a non-blocking version of the `run` method that
returns a
[PipelineExecution](apidocs/0.9.0/org/apache/crunch/PipelineExecution.html)
instance that can be used to monitor the currently running Crunch pipeline. The
PipelineExecution object is also useful for debugging
+The `runAsync` method is a non-blocking version of the `run` method that
returns a
[PipelineExecution](apidocs/0.10.0/org/apache/crunch/PipelineExecution.html)
instance that can be used to monitor the currently running Crunch pipeline. The
PipelineExecution object is also useful for debugging
Crunch pipelines by visualizing the Crunch execution plan in DOT format via
its `String getPlanDotFile()` method. PipelineExection implements
Guava's
[ListenableFuture](https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained),
so you can attach handlers that will be
called when your pipeline finishes executing.
@@ -1373,7 +1384,7 @@ the different execution engines.
<a name="mrpipeline"></a>
### MRPipeline
-The [MRPipeline](apidocs/0.9.0/org/apache/crunch/impl/mr/MRPipeline.html) is
the oldest implementation of the Pipeline interface and
+The [MRPipeline](apidocs/0.10.0/org/apache/crunch/impl/mr/MRPipeline.html) is
the oldest implementation of the Pipeline interface and
compiles and executes the DAG of PCollections into a series of MapReduce jobs.
MRPipeline has three constructors that are commonly
used:
@@ -1435,7 +1446,7 @@ aware of:
<a name="sparkpipeline"></a>
### SparkPipeline
-The `SparkPipeline` is the newest implementation of the Pipeline interface,
and was added in Crunch 0.9.0. It has two default constructors:
+The `SparkPipeline` is the newest implementation of the Pipeline interface,
and was added in Crunch 0.10.0. It has two default constructors:
1. `SparkPipeline(String sparkConnection, String appName)` which takes a Spark
connection string, which is of the form `local[numThreads]` for
local mode or `master:port` for a Spark cluster. This constructor will create
its own `JavaSparkContext` instance to control the Spark pipeline
@@ -1465,7 +1476,7 @@ actively working to ensure complete comp
<a name="mempipeline"></a>
### MemPipeline
-The [MemPipeline](apidocs/0.9.0/org/apache/crunch/impl/mem/MemPipeline.html)
implementation of Pipeline has a few interesting
+The [MemPipeline](apidocs/0.10.0/org/apache/crunch/impl/mem/MemPipeline.html)
implementation of Pipeline has a few interesting
properties. First, unlike MRPipeline, MemPipeline is a singleton; you don't
create a MemPipeline, you just get a reference to it
via the static `MemPipeline.getInstance()` method. Second, all of the
operations in the MemPipeline are executed completely in-memory,
there is no serialization of data to disk by default, and PType usage is
fairly minimal. This has both benefits and drawbacks; on
@@ -1509,9 +1520,9 @@ interface has several tools to help deve
### Unit Testing DoFns
Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very
easy to test, since they accept a single input
-and return a single output. For general purpose DoFns, we need an instance of
the [Emitter](apidocs/0.9.0/org/apache/crunch/Emitter.html)
+and return a single output. For general purpose DoFns, we need an instance of
the [Emitter](apidocs/0.10.0/org/apache/crunch/Emitter.html)
interface that we can pass to the DoFn's `process` method and then read in the
values that are written by the function. Support
-for this pattern is provided by the
[InMemoryEmitter](apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html)
class, which
+for this pattern is provided by the
[InMemoryEmitter](apidocs/0.10.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html)
class, which
has a `List<T> getOutput()` method that can be used to read the values that
were passed to the Emitter instance by a DoFn instance:
@Test