[FLINK-1328] [docs] Updated documentation of semantic properties.
Improved documentation of key specification and data type.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c2c4260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c2c4260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c2c4260

Branch: refs/heads/master
Commit: 4c2c426085ebaa0236b91a733456baf606ae0edd
Parents: de8e066
Author: Fabian Hueske <[email protected]>
Authored: Wed Jan 28 01:19:27 2015 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Wed Jan 28 01:39:01 2015 +0100

----------------------------------------------------------------------
 docs/programming_guide.md | 690 +++++++++++++++++++++--------------------
 1 file changed, 353 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c2c4260/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index cf91751..9f0d442 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -901,8 +901,7 @@ possible for [Data Sources](#data-sources) and [Data 
Sinks](#data-sinks).
 Specifying Keys
 -------------
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+
 
 Some transformations (join, coGroup) require that a key is defined on
 its argument DataSets, and other transformations (Reduce, GroupReduce,
@@ -927,6 +926,9 @@ actual data to guide the grouping operator.
 
 The simplest case is grouping a data set of Tuples on one or more
 fields of the Tuple:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Tuple3<Integer,String,Long>> input = // [...]
 DataSet<Tuple3<Integer,String,Long> grouped = input
@@ -935,7 +937,7 @@ DataSet<Tuple3<Integer,String,Long> grouped = input
 {% endhighlight %}
 
 The data set is grouped on the first field of the tuples (the one of
-Integer type). The GroupReduceFunction will thus receive groups of tuples with
+Integer type). The GroupReduce function will thus receive groups of tuples with
 the same value in the first field.
 
 {% highlight java %}
@@ -946,23 +948,64 @@ DataSet<Tuple3<Integer,String,Long> grouped = input
 {% endhighlight %}
 
 The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the GroupReduceFuntion will receive groups
+second field. Therefore, the GroupReduce function will receive groups
 with the same value for both fields.
 
 A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
 {% highlight java %}
 DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
 {% endhighlight %}
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Integer and Float being the key). If you want to "navigate" into 
the nested `Tuple2`, you have to use a string-based expression, as explained 
below. For this particular example, you would have to specfiy `f0.f0`.
 
-### Define key using a String Expression
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Integer and Float being the key). If you want to "navigate" into 
the nested `Tuple2`, you have to use field expression keys which are explained 
below.
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0)
+  .reduceGroup(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0,1)
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second field. Therefore, the GroupReduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight scala %}
+val ds: DataSet[((Int, Float), String, Long)]
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Int and
+Float being the key). If you want to "navigate" into the nested `Tuple2`, you 
have to use field expression keys which are explained below.
+
+</div>
+</div>
+
+### Define keys using Field Expressions
 {:.no_toc}
 
-Starting from release 0.7-incubating, you can use String-based key expressions 
to select keys.
+Starting from release 0.7-incubating, you can use String-based field 
expressions to reference nested fields and define keys for grouping, sorting, 
joining, or coGrouping. In addition, field expressions can be used to define 
[semantic function annotations](#semantic-annotations).
 
-The String expressions allow to specify the name of the field in a class you 
want to group by.
+Field expressions make it very easy to select fields in (nested) composite 
types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
 
-In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field "word", we just pass this name to the `groupBy()` 
function.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field `word`, we just pass its name to the `groupBy()` function.
 {% highlight java %}
 // some ordinary POJO (Plain old Java Object)
 public class WC {
@@ -973,20 +1016,17 @@ DataSet<WC> words = // [...]
 DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
 {% endhighlight %}
 
-**Conditions** for a class to be treated as a POJO by Flink:
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the 
"user" field of a POJO type.
 
-- The class must be public
-- It must have a public constructor without arguments
-- All fields either have to be public or there must be getters and setters for 
all non-public fields. If the field name is `foo` the getter and setters must 
be called `getFoo()` and `setFoo()`.
+- Select Tuple fields by their field name or 0-offset field index. For example 
`"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, 
respectively.
 
-**Valid Expressions**:
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` 
refers to the "zip" field of a POJO which is stored in the "user" field of a 
POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such 
as `"f1.user.zip"` or `"user.f3.1.zip"`.
 
-- You can select POJO fields by their field name
-- You can select Tuple fields by their field name as well. For example `f0` or 
`f5`.
-- You can select nested fields in POJOs and Tuples. Expressions like 
`user.zip` or `user.groupId` are valid. Flink also supports POJOs inside 
Tuples: `f1.user.zip`.
-- You can select all fields at each level. To select all fields, specify `*`. 
This also works for the nested case: `user.*`.
+- You can select the full type using the `"*"` wildcard expressions. This does 
also work for types which are not Tuple or POJO types.
 
-**Example for nested POJO**
+**Field Expression Example**:
 
 {% highlight java %}
 public static class WC {
@@ -1008,127 +1048,20 @@ public static class ComplexNestedClass {
 }
 {% endhighlight %}
 
-These are valid expressions for the example POJO above:
+These are valid field expressions for the example code above:
 
-- `count`: The count field in the `WC` class.
-- `complex.*`: Selects all fields in the `ComplexNestedClass`.
-- `complex.word.f2`: Selects the last field in the Tuple3.
-- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
-
-Please note that you can only use types inside POJOs that Flink is able to 
serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize 
arbitrary objects (such as `Date`).
-
-### Define key using a Key Selector Function
-{:.no_toc}
-
-An additional way to define keys are "key selector" functions, which
-takes as argument one dataset element and returns a key of an
-arbitrary data type by performing an arbitrary computation on this
-element. For example:
-{% highlight java %}
-// some ordinary POJO
-public class WC {public String word; public int count;}
-DataSet<WC> words = // [...]
-DataSet<WC> wordCounts = words
-                         .groupBy(
-                           new KeySelector<WC, String>() {
-                             public String getKey(WC wc) { return wc.word; }
-                           })
-                         .reduce(/*do something*/);
-{% endhighlight %}
+- `"count"`: The count field in the `WC` class.
 
-Remember that keys are not only used for grouping, but also joining and 
matching data sets:
-{% highlight java %}
-// some POJO
-public class Rating {
-  public String name;
-  public String category;
-  public int points;
-}
-DataSet<Rating> ratings = // [...]
-DataSet<Tuple2<String, Double>> weights = // [...]
-DataSet<Tuple2<String, Double>>
-            weightedRatings =
-            ratings.join(weights)
+- `"complex"`: Recursively selects all fields of the field complex of POJO 
type `ComplexNestedClass`.
 
-                   // key of the first input
-                   .where(new KeySelector<Rating, String>() {
-                            public String getKey(Rating r) { return 
r.category; }
-                          })
+- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
 
-                   // key of the second input
-                   .equalTo(new KeySelector<Tuple2<String, Double>, String>() {
-                              public String getKey(Tuple2<String, Double> t) { 
return t.f0; }
-                            });
-{% endhighlight %}
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
 
 </div>
-
 <div data-lang="scala" markdown="1">
-Some transformations (join, coGroup) require that a key is defined on
-its argument DataSets, and other transformations (Reduce, GroupReduce,
-Aggregate) allow that the DataSet is grouped on a key before they are
-applied.
-
-A DataSet is grouped as
-{% highlight scala %}
-val input: DataSet[...] = // [...]
-val reduced = input
-  .groupBy(/*define key here*/)
-  .reduceGroup(/*do something*/)
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data set types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-### Define keys for Tuples
-{:.no_toc}
-
-The simplest case is grouping a data set of Tuples on one or more
-fields of the Tuple:
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0)
-  .reduceGroup(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the first field of the tuples (the one of
-Integer type). The group-reduce function will thus receive groups of tuples 
with
-the same value in the first field.
-
-{% highlight scala %}
-val input: DataSet[(Int, String, Long)] = // [...]
-val grouped = input
-  .groupBy(0,1)
-  .reduce(/*do something*/)
-{% endhighlight %}
-
-The data set is grouped on the composite key consisting of the first and the
-second fields, therefore the group-reduce function will receive groups
-with the same value for both fields.
-
-A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
-{% highlight scala %}
-val ds: DataSet[((Int, Float), String, Long)]
-{% endhighlight %}
-
-Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Int and
-Float being the key). If you want to "navigate" into the nested `Tuple2`, you 
have to use a
-string-based expression, as explained below. For this particular example, you 
would have to specfiy
-`"_1._1"`.
-
-### Define key using a String Expression
-{:.no_toc}
-
-Starting from release 0.7-incubating, you can use String-based key expressions 
to select keys.
-
-The String expressions allow to specify the name of the field in a class you 
want to group by.
-
-In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field
-"word", we just pass this name to the `groupBy()` function.
 
+In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field `word`, we just pass its name to the `groupBy()` function.
 {% highlight java %}
 // some ordinary POJO (Plain old Java Object)
 class WC(var word: String, var count: Int) {
@@ -1143,30 +1076,23 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words.groupBy("word").reduce(/*do something*/)
 {% endhighlight %}
 
-**Conditions** for a class to enable using field selection expressions:
+**Field Expression Syntax**:
 
-- The class must be public
-- It must have a public constructor without arguments or be a case class.
-- All fields either have to be public or there must be getters and setters for 
all non-public
- fields. If the field name is `foo` the getter and setters must be called 
`foo` and `foo_=`. This
- is what normally gets generated when you hava a `var foo` in your class. This 
also automatically
- applies to case classes since the getters and setters are automatically 
generated.
+- Select POJO fields by their field name. For example `"user"` refers to the 
"user" field of a POJO type.
 
-**Valid Expressions**:
+- Select Tuple fields by their 1-offset field name or 0-offset field index. 
For example `"_1"` and `"5"` refer to the first and sixth field of a Scala 
Tuple type, respectively.
 
-- You can select POJO fields by their field name
-- You can select Tuple fields by their field name as well. For example `_1` or 
`_6`.
-- You can select nested fields in POJOs and Tuples. Expressions like 
`user.zip` or `user.groupId`
-  are valid. Flink also supports POJOs inside Tuples: `_2.user.zip`.
-- You can select all fields at each level. To select all fields, specify `*`. 
This also works for
-  the nested case: `user.*`.
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` 
refers to the "zip" field of a POJO which is stored in the "user" field of a 
POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such 
as `"_2.user.zip"` or `"user._4.1.zip"`.
 
-**Example for nested POJO**
+- You can select the full type using the `"_"` wildcard expressions. This does 
also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
 
 {% highlight scala %}
 class WC(var complex: ComplexNestedClass, var count: Int) {
   def this() { this(null, 0) }
 }
+
 class ComplexNestedClass(
     var someNumber: Int,
     someFloat: Float,
@@ -1176,23 +1102,43 @@ class ComplexNestedClass(
 }
 {% endhighlight %}
 
-These are valid expressions for the example POJO above:
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
 
-- `count`: The count field in the `WC` class.
-- `complex.*`: Selects all fields in the `ComplexNestedClass`.
-- `complex.word._3`: Selects the last field in the Tuple3.
-- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
+- `"complex"`: Recursively selects all fields of the field complex of POJO 
type `ComplexNestedClass`.
 
-Please note that you can only use types inside POJOs that Flink is able to 
serialize. Currently,
-we are using [Avro](http://avro.apache.org) to serialize arbitrary objects 
(such as `Date`).
+- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
 
-### Define key using a Key Selector Function
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+</div>
+
+### Define keys using Key Selector Functions
 {:.no_toc}
 
-An additional way to define keys are "key selector" functions, which
-takes as argument one dataset element and returns a key of an
-arbitrary data type by performing an arbitrary computation on this
-element. For example:
+An additional way to define keys are "key selector" functions. A key selector 
function 
+takes a single dataset element as input and returns the key for the element. 
The key can be of any type and be derived from arbitrary computations. 
+
+The following example shows a key selector function that simply returns the 
field of an object:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         .groupBy(
+                           new KeySelector<WC, String>() {
+                             public String getKey(WC wc) { return wc.word; }
+                           })
+                         .reduce(/*do something*/);
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
 // some ordinary case class
 case class WC(word: String, count: Int)
@@ -1200,16 +1146,6 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words
   .groupBy( _.word ).reduce(/*do something*/)
 {% endhighlight %}
-
-Remember that keys are not only used for grouping, but also joining and 
matching data sets:
-{% highlight scala %}
-// some case class
-case class Rating(name: String, category: String, points: Int)
-
-val ratings: DataSet[Rating] = // [...]
-val weights: DataSet[(String, Double)] = // [...]
-val weightedRatings = ratings.join(weights).where("category").equalTo("_1")
-{% endhighlight %}
 </div>
 </div>
 
@@ -1369,192 +1305,147 @@ Flink places some restrictions on the type of 
elements that are used in DataSets
 of transformations. The reason for this is that the system analyzes the types 
to determine
 efficient execution strategies.
 
-There are four different categories of data types, which are treated slightly 
different when it
-to [specifying keys](#specifying-keys):
-
-1. **General Types and POJOs**
-2. **Tuples**/**Case Classes**
-3. **Values**
-4. **Hadoop Writables**
+There are six different categories of data types:
 
+1. **Java Tuples** and **Scala Case Classes**
+2. **Java POJOs**
+3. **Primitive Types**
+4. **Regular Classes**
+5. **Values**
+6. **Hadoop Writables**
 
-#### General Types
-
-Out of the box, Flink supports all primitive types of your programming 
language of choice.
-
-Furthermore, you can use the vast majority of custom classes. Restrictions 
apply to classes
-containing fields that cannot be serialized, like File pointers, I/O streams, 
or other native
-resources. Classes that follow the Java Beans conventions work well in 
general. The following
-defines a simple example class to illustrate how you can use custom classes:
+#### Tuples and Case Classes
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-{% highlight java %}
-public class WordWithCount {
 
-    public String word;
-    public int count;
-
-    public WordCount() {}
-
-    public WordCount(String word, int count) {
-        this.word = word;
-        this.count = count;
-    }
-}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class WordWithCount(val word: String, val count: Int) {
-    def this() {
-      this(null, -1)
-    }
-}
-{% endhighlight %}
-</div>
-</div>
-
-You can use all of those types to parameterize DataSet and function 
implementations, e.g.
-`DataSet` or a `MapFunction`.
+Tuples are composite types that contain a fixed number of fields with various 
types. 
+The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a 
tuple
+can be an arbitrary Flink type including further tuples, resulting in nested 
tuples. Fields of a
+tuple can be accessed directly using the field's name as `tuple.f4`, or using 
the generic getter method
+`tuple.getField(int position)`. The field indicies start at 0. Note that this 
stands in contrast
+to the Scala tuples, but it is more consistent with Java's general indexing.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-// using a basic data type
-DataSet<String> numbers = env.fromElements("1", "2");
+DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
+    new Tuple2<String, Integer>("hello", 1),
+    new Tuple2<String, Integer>("world", 2));
 
-numbers.map(new MapFunction<String, Integer>() {
+wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
     @Override
-    public String map(String value) throws Exception {
-        return Integer.parseInt(value);
+    public String map(Tuple2<String, Integer> value) throws Exception {
+        return value.f1;
     }
 });
+{% endhighlight %}
 
-// using a custom class
-DataSet<WordCount> wordCounts = env.fromElements(
-    new WordCount("hello", 1),
-    new WordCount("world", 2));
+When grouping, sorting, or joining a data set of tuples, keys can be specified 
as field positions or field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
 
-wordCounts.map(new MapFunction<WordCount, Integer>() {
-    @Override
-    public String map(WordCount value) throws Exception {
-        return value.count;
-    }
-});
+{% highlight java %}
+wordCounts
+    .groupBy(0) // also valid .groupBy("f0")
+    .reduce(new MyReduceFunction());
 {% endhighlight %}
+
 </div>
 <div data-lang="scala" markdown="1">
+
+Scala case classes (and Scala tuples which are a special case of case 
classes), are composite types that contain a fixed number of fields with 
various types. Tuple fields are addressed by their 1-offset names such as `_1` 
for the first field. Case class fields are accessed by their name.
+
 {% highlight scala %}
-// using a primitive data type
-// Note that the type ascription "DataSet[String]" can be omitted in Scala
-// it is just given to clarify the type of numbers
-val numbers: DataSet[String] = env.fromElements("1", "2")
-
-numbers.map(new MapFunction[String, Int]() {
-    def map(in: String): Int = {
-      in.toInt
-    }
-})
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2)) // Case Class Data Set
 
-// using a custom class
-val wordCounts = env.fromElements(
-  new WordCount("hello", 1),
-  new WordCount("world", 2))
+input.groupBy("word").reduce(...) // group by field expression "word"
 
-wordCounts.map { _.count }
+val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
+
+input2.groupBy(0, 1).reduce(...) // group by field positions 0 and 1
 {% endhighlight %}
+
+When grouping, sorting, or joining a data set of tuples, keys can be specified 
as field positions or field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
+
 </div>
 </div>
 
+#### POJOs
+
+Java and Scala classes are treated by Flink as a special POJO data type if 
they fulfill the following requirements:
+
+- The class must be public.
 
-When working with operators that require a Key for grouping or matching records
-you can select the key using a key expression (see
-[Specifying Keys](#specifying-keys)).
+- It must have a public constructor without arguments (default constructor).
+
+- All fields are either public or must be accessible through getter and setter 
functions. For a field called `foo` the getter and setter methods must be named 
`getFoo()` and `setFoo()`.
+
+- The type of a field must be supported by Flink. At the moment, Flink uses 
[Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+
+Flink analyzes the structure of POJO types, i.e., it learns about the fields 
of a POJO. As a result POJO types are easier to use than general types. 
Moreover, they Flink can process POJOs more efficiently than general types.  
+
+The following example shows a simple POJO with two public fields.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-wordCounts.groupBy("word").reduce(new MyReduceFunction());
+public class WordWithCount {
+
+    public String word;
+    public int count;
+
+    public WordCount() {}
+
+    public WordCount(String word, int count) {
+        this.word = word;
+        this.count = count;
+    }
+}
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+class WordWithCount(val word: String, val count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
 {% endhighlight %}
 </div>
 </div>
 
-
-#### Tuples/Case Classes
+When grouping, sorting, or joining a data set of POJO types, keys can be 
specified with field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-
-You can use the Tuple classes for composite types. Tuples contain a fix number 
of fields of
-various types. The Java API provides classes from `Tuple1` up to `Tuple25`. 
Every field of a tuple
-can be an arbitrary Flink type - including further tuples, resulting in nested 
tuples. Fields of a
-Tuple can be accessed directly using the fields `tuple.f4`, or using the 
generic getter method
-`tuple.getField(int position)`. The field numbering starts with 0. Note that 
this stands in contrast
-to the Scala tuples, but it is more consistent with Java's general indexing.
-
-{% highlight java %}
-DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
-    new Tuple2<String, Integer>("hello", 1),
-    new Tuple2<String, Integer>("world", 2));
-
-wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
-    @Override
-    public String map(Tuple2<String, Integer> value) throws Exception {
-        return value.f1;
-    }
-});
-{% endhighlight %}
-
-When working with operators that require a Key for grouping or matching 
records,
-Tuples let you simply specify the positions of the fields to be used as key. 
You can specify more
-than one position to use composite keys (see [Section Data 
Transformations](#transformations)).
-
 {% highlight java %}
 wordCounts
-    .groupBy(0) // also valid .groupBy("f0")
+    .groupBy("word")                    // group by field expression "word"
     .reduce(new MyReduceFunction());
 {% endhighlight %}
-Also, you can "navigate" into nested tuples using (String) key expressions.
-
-In order to access fields more intuitively and to generate more readable code, 
it is also possible
-to extend a subclass of Tuple. You can add getters and setters with custom 
names that delegate to
-the field positions. See this
-{% gh_link 
/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
 "example" %} for an
-illustration how to make use of that mechanism.
-
-Note that if you are extending from a Tuple and add fields to your class, it 
will be treated as a POJO.
-
 </div>
 <div data-lang="scala" markdown="1">
+{% highlight scala %}
+wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+{% endhighlight %}
+</div>
+</div>
 
-Flink has special support for Scala's Case Classes and Tuples. When using 
working with an operator
-that required a key for grouping or matching records this key can be specified 
using tuple field
-positions or field names: 
+#### Primitive Types
 
-{% highlight scala %}
-case class WordCount(word: String, count: Int)
-val input = env.fromElements(
-    WordCount("hello", 1),
-    WordCount("world", 2))
+Flink supports all Java and Scala primitive types such as `Integer`, `String`, 
and `Double`. 
 
-input.groupBy("word").reduce(...)
+#### General Class Types
 
-val input2 = env.fromElements(("hello", 1), ("world", 2))
+Flink supports most Java and Scala classes (API and custom). 
+Restrictions apply to classes containing fields that cannot be serialized, 
like file pointers, I/O streams, or other native
+resources. Classes that follow the Java Beans conventions work well in general.
 
-input2.groupBy(0, 1).reduce(...)
-{% endhighlight %}
+All classes that are not identified as POJO types (see POJO requirements 
above) are handled by Flink as general class types. 
+Flink treats these data types as black boxes and is not able to access their 
their content (i.e., for efficient sorting). General types are de/serialized 
using the serialization framework 
[Kryo](https://github.com/EsotericSoftware/kryo). 
 
-Both variants allow specifying more than one key field name or key field 
position. See
-[specifying keys](#specifying-keys) for more details.
+When grouping, sorting, or joining a data set of generic types, keys must be 
specified with key selector functions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
 
-</div>
-</div>
 
 #### Values
 
@@ -2257,36 +2148,81 @@ env.execute()
 Semantic Annotations
 -----------
 
-Semantic Annotations give hints about the behavior of a function by telling 
the system which fields
-in the input are accessed and which are constant between input and output data 
of a function (copied
-but not modified). Semantic annotations are a powerful means to speed up 
execution, because they
+Semantic annotations can be used give Flink hints about the behavior of a 
function. 
+They tell the system which fields of a function's input the function reads and 
evaluates and
+which fields it unmodified forwards from its input to its output. 
+Semantic annotations are a powerful means to speed up execution, because they
 allow the system to reason about reusing sort orders or partitions across 
multiple operations. Using
 semantic annotations may eventually save the program from unnecessary data 
shuffling or unnecessary
-sorts.
+sorts and significantly improve the performance of a program.
+
+**Note:** The use of semantic annotations is optional. However, it is 
absolutely crucial to 
+be conservative when providing semantic annotations! 
+Incorrect semantic annotations will cause Flink to make incorrect assumptions 
about your program and 
+might eventually lead to incorrect results. 
+If the behavior of an operator is not clearly predictable, no annotation 
should be provided.
+Please read the documentation carefully.
+
+The following semantic annotations are currently supported.
+
+#### Forwarded Fields Annotation
+
+Forwarded fields information declares input fields which are unmodified 
forwarded by a function to the same position or to another position in the 
output. 
+This information is used by the optimizer to infer whether a data property 
such as sorting or 
+partitioning is preserved by a function.
+
+Field forward information is specified using [field 
expressions](#define-keys-using-field-expressions).
+Fields that are forwarded to the same position in the output can be specified 
by their position. 
+The specified position must be valid for the input and output data type and 
have the same type.
+For example the String `"f2"` declares that the third field of a Java input 
tuple is always equal to the third field in the output tuple.
+
+Fields which are unmodified forwarded to another position in the output are 
declared by specifying the
+source field in the input and the target field in the output as field 
expressions.
+The String `"f0->f2"` denotes that the first field of the Java input tuple is
+unchanged copied to the third field of the Java output tuple. The wildcard 
expression `*` can be used to refer to a whole input or output type, i.e., 
`"f0->*"` denotes that the output of a function is always equal to the first 
field of its Java input tuple.
+
+Multiple forwarded fields can be declared in a single String by separating 
them with semicolons as `"f0; f2->f1; f3->f2"` or in separate Strings `"f0", 
"f2->f1", "f3->f2"`. When specifying forwarded fields it is not required that 
all forwarded fields are declared, but all declarations must be correct.
+
+Forwarded field information can be declared by attaching Java annotations on 
function class definitions or 
+by passing them as operator arguments after invoking a function on a DataSet 
as shown below.
 
-Semantic annotations can be attached to functions through Annotations, or 
passed as arguments
-when invoking a function on a DataSet. The following example illustrates that:
+##### Function Class Annotations
+
+* `@ForwardedFields` for single input functions such as Map and Reduce.
+* `@ForwardedFieldsFirst` for the first input of a functions with two inputs 
such as Join and CoGroup.
+* `@ForwardedFieldsSecond` for the second input of a functions with two inputs 
such as Join and CoGroup.
+
+##### Operator Arguments
+
+* `data.map(myMapFnc).withForwardedFields()` for single input function such as 
Map and Reduce.
+* 
`data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst()` 
for the first input of a function with two inputs such as Join and CoGroup. 
+* 
`data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond()` 
for the second input of a function with two inputs such as Join and CoGroup.
+
+Please note that it is not possible to overwrite field forward information 
which was specified as a class annotation by operator arguments. 
+
+##### Example
+
+The following example shows how to declare forwarded field information using a 
function class annotation:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@ConstantFields("1")
-public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> {
+@ForwardedFields("f0->f2")
+public class MyMap implements 
+              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, 
Integer>> {
   @Override
-  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
-    value.f0 /= 2;
-    return value;
+  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
+    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
   }
 }
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-@ConstantFields("1")
-class DivideFirstbyTwo extends MapFunction[(Int, Int), (Int, Int)]{
-   def map(input: (Int, Int): (Int, Int) = {
-    value.f0 /= 2;
-    (input._1 / 2, input._2)
+@ForwardedFields("_1->_3")
+class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
+   def map(value: (Int, Int)): (String, Int, Int) = {
+    return ("foo", value._2 / 2, value._1)
   }
 }
 {% endhighlight %}
@@ -2294,33 +2230,113 @@ class DivideFirstbyTwo extends MapFunction[(Int, Int), 
(Int, Int)]{
 </div>
 </div>
 
-The following annotations are currently available:
+#### Non-Forwarded Fields
 
-* `@ConstantFields`: Declares constant fields (forwarded/copied) for functions 
with a single input
-  data set (Map, Reduce, Filter, ...).
+Non-forwarded fields information declares all fields which are not preserved 
on the same position in a function's output. 
+The values of all other fields are considered to be preserved at the same 
position in the output. 
+Hence, non-forwarded fields information is inverse to forwarded fields 
information.
 
-* `@ConstantFieldsFirst`: Declares constant fields (forwarded/copied) for 
functions with a two input
-  data sets (Join, CoGroup, ...), with respect to the first input data set.
+**IMPORTANT**: The specification of non-forwarded fields information is 
optional. However if used, 
+**ALL!** non-forwarded fields must be specified, because all other fields are 
considered to be forwarded in place. It is safe to declare a forwarded field as 
non-forwarded.
 
-* `@ConstantFieldsSecond`: Declares constant fields (forwarded/copied) for 
functions with a two
-  input data sets (Join, CoGroup, ...), with respect to the first second data 
set.
+Non-forwarded fields are specified as a list of [field 
expressions](#define-keys-using-field-expressions). The list can be either 
given as a single String with field expressions separated by semicolons or as 
multiple Strings. 
+For example both `"f1; f3"` and `"f1", "f3"` declare that the second and 
fourth field of a Java tuple 
+are not preserved in place and all other fields are preserved in place. 
+Non-forwarded field information can only be specified for functions which have 
identical input and output types.
 
-* `@ConstantFieldsExcept`: Declares that all fields are constant, except for 
the specified fields.
-  Applicable to functions with a single input data set.
+Non-forwarded field information is specified as function class annotations 
using the following annotations:
 
-* `@ConstantFieldsFirstExcept`: Declares that all fields of the first input 
are constant, except for
-  the specified fields. Applicable to functions with a two input data sets.
+* `@NonForwardedFields` for single input functions such as Map and Reduce.
+* `@NonForwardedFieldsFirst` for the first input of a function with two inputs 
such as Join and CoGroup.
+* `@NonForwardedFieldsSecond` for the second input of a function with two 
inputs such as Join and CoGroup.
 
-* `@ConstantFieldsSecondExcept`: Declares that all fields of the second input 
are constant, except
-  for the specified fields. Applicable to functions with a two input data sets.
+##### Example
 
-*(Note: The system currently only evaluates annotations only Tuple DataSets.  
This will be extended
-in the next versions)*
+The following example shows how to declare non-forwarded field information:
 
-**Note**: It is important to be conservative when providing annotations. Only 
annotate fields,
-when they are always constant for every call to the function. Otherwise the 
system has incorrect
-assumptions about the execution and the execution may produce wrong results. 
If the behavior of the
-operator is not clearly predictable, no annotation should be provided.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@NonForwardedFields("f1") // second field is not forwarded
+public class MyMap implements 
+              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+  @Override
+  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
+    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
+  }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@NonForwardedFields("_2") // second field is not forwarded
+class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
+  def map(value: (Int, Int)): (Int, Int) = {
+    return (value._1, value._2 / 2)
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+#### Read Fields
+
+Read fields information declares all fields that are accessed and evaluated by 
a function, i.e.,
+all fields that are used by the function to compute its result.
+For example, fields which are evaluated in conditional statements or used for 
computations must be marked as read when specifying read fields information.
+Fields which are only unmodified forwarded to the output without evaluating 
their values or fields which are not accessed at all are not considered to be 
read.
+
+**IMPORTANT**: The specification of read fields information is optional. 
However if used, 
+**ALL!** read fields must be specified. It is safe to declare a non-read field 
as read.
+
+Read fields are specified as a list of [field 
expressions](#define-keys-using-field-expressions). The list can be either 
given as a single String with field expressions separated by semicolons or as 
multiple Strings. 
+For example both `"f1; f3"` and `"f1", "f3"` declare that the second and 
fourth field of a Java tuple are read and evaluated by the function.
+
+Read field information is specified as function class annotations using the 
following annotations:
+
+* `@ReadFields` for single input functions such as Map and Reduce.
+* `@ReadFieldsFirst` for the first input of a function with two inputs such as 
Join and CoGroup.
+* `@ReadFieldsSecond` for the second input of a function with two inputs such 
as Join and CoGroup.
+
+##### Example
+
+The following example shows how to declare read field information:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. 
+public class MyMap implements 
+              MapFunction<Tuple4<Integer, Integer, Integer, Integer>, 
+                          Tuple2<Integer, Integer>> {
+  @Override
+  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, 
Integer> val) {
+    if(val.f0 == 42) {
+      return new Tuple2<Integer, Integer>(val.f0, val.f1);
+    } else {
+      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
+    }
+  }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
+class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
+   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
+    if (value._1 == 42) {
+      return (value._1, value._2)
+    } else {
+      return (value._4 + 10, value._2)
+    }
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
 
 [Back to top](#top)
 

Reply via email to