[ 
https://issues.apache.org/jira/browse/BEAM-5124?focusedWorklogId=152512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152512
 ]

ASF GitHub Bot logged work on BEAM-5124:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 04:51
            Start Date: 09/Oct/18 04:51
    Worklog Time Spent: 10m 
      Work Description: VaclavPlajt closed pull request #540: [BEAM-5124] DSL 
Euphoria documentation update
URL: https://github.com/apache/beam-site/pull/540
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/documentation/sdks/euphoria.md 
b/src/documentation/sdks/euphoria.md
index 6a90b13fba..c3f9b6b681 100644
--- a/src/documentation/sdks/euphoria.md
+++ b/src/documentation/sdks/euphoria.md
@@ -18,72 +18,567 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 # Euphoria Java 8 DSL
+<!--
+NOTE for future maintainer.
+There is [`DocumentationExamplesTest`]({{ site.baseurl 
}}/documentation/sdks/javadoc/{{ site.release_latest 
}}/index.html?org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.html)
 in `beam-sdks-java-extensions-euphoria-core` project where all code examples 
are validated. Do not change the code examples without reflecting it in the 
`DocumentationExamplesTest` and vice versa.
 
-## What is Euphoria
-
-Easy to use Java 8 DSL for the Beam Java SDK. Provides a high-level 
abstraction of Beam transformations, which is both easy to read and write. Can 
be used as a complement to existing Beam pipelines (convertible back and forth).
-
-Integration of Euphoria API to Beam is in **progress** 
([BEAM-3900](https://issues.apache.org/jira/browse/BEAM-3900)).
+Following operator is unsupported. Include it in documentation when supported.
 
-## How to build
+Lower level transformations (if possible user should prefer above 
transformations):
+### `ReduceStateByKey`: assigns each input item to a set of windows and turns 
the item into a key/value pair.
+For each of the assigned windows the extracted value is accumulated using a 
user provided `StateFactory` state
+ implementation under the extracted key. I.e. the value is accumulated into a 
state identified by
+ a key/window pair.
+-->
 
-Euphoria is located in `dsl-euphoria` branch. To build `euphoria` subprojects 
use command:
+## What is Euphoria
+Easy to use Java 8 API build on top of the Beam's Java SDK. API provides a 
[high-level abstraction](#operator-reference) of data transformations, with 
focus on the Java 8 language features (e.g. lambdas and streams). It is fully 
inter-operable with existing Beam SDK and convertible back and forth. It allows 
fast prototyping through use of (optional) 
[Kryo](https://github.com/EsotericSoftware/kryo) based coders, lambdas and high 
level operators and can be [seamlessly 
integrated](#integration-of-euphoria-into-existing-pipelines) into existing 
Beam `Pipelines`.
 
-```
-./gradlew :beam-sdks-java-extensions-euphoria-beam:build 
-```
+[Euphoria API](https://github.com/seznam/euphoria) project has been started in 
2014, with a clear goal of providing the main building block for 
[Seznam.cz's](https://www.seznam.cz/) data infrastructure.
+In 2015, [DataFlow 
whitepaper](http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf) inspired original 
authors to go one step further and also provide the unified API for both stream 
and batch processing.
+The API has been open-sourced in 2016 and is still in active development. As 
the Beam's community goal was very similar, we decided to contribute
+the API as a high level DSL over Beam Java SDK and share our effort with the 
community.
 
-## WordCount example
+Euphoria DSL integration is still work in progress and is tracked as part of 
[BEAM-3900](https://issues.apache.org/jira/browse/BEAM-3900).
 
+## WordCount Example
+Lets start with the small example.
 ```java
+PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline pipeline = Pipeline.create(options);
 
-// Transform to euphoria's flow.
-BeamFlow flow = BeamFlow.create(pipeline);
+// Use Kryo as coder fallback
+KryoCoderProvider.of().registerTo(pipeline);
 
 // Source of data loaded from Beam IO.
 PCollection<String> input =
-    
pipeline.apply(Create.of(inputs)).setTypeDescriptor(TypeDescriptor.of(String.class));
+    pipeline
+        .apply(Create.of(textLineByLine))
+        .setTypeDescriptor(TypeDescriptor.of(String.class));
+
 // Transform PCollection to euphoria's Dataset.
-Dataset<String> lines = flow.wrapped(input);
+Dataset<String> lines =  Dataset.of(input);
 
 // FlatMap processes one input element at a time and allows user code to emit
 // zero, one, or more output elements. From input lines we will get data set 
of words.
-Dataset<String> words = FlatMap.named("TOKENIZER")
-    .of(lines)
-    .using((String line, Collector<String> context) -> {
-      for (String word : line.split("\\s+")) {
-        context.collect(word);
-      }
+Dataset<String> words =
+    FlatMap.named("TOKENIZER")
+        .of(lines)
+        .using(
+            (String line, Collector<String> context) -> {
+              for (String word : Splitter.onPattern("\\s+").split(line)) {
+                context.collect(word);
+              }
+            })
+        .output();
+
+// Now we can count input words - the operator ensures that all values for the 
same
+// key (word in this case) end up being processed together. Then it counts 
number of appearances
+// of the same key in 'words' dataset and emits it to output.
+Dataset<KV<String, Long>> counted =
+    CountByKey.named("COUNT")
+        .of(words)
+        .keyBy(w -> w)
+        .output();
+
+// Format output.
+Dataset<String> output =
+    MapElements.named("FORMAT")
+        .of(counted)
+        .using(p -> p.getKey() + ": " + p.getValue())
+        .output();
+
+// Transform Dataset back to PCollection. It can be done anytime.
+PCollection<String> outputCollection = output.getPCollection();
+
+// Now we can again use Beam transformation. In this case we save words and 
their count
+// into the text file.
+outputCollection
+    .apply(TextIO.write()
+    .to("counted_words"));
+
+pipeline.run();
+```
+
+## Euphoria Guide
+
+Euphoria API is composed from a set of operators, which allows you to 
construct `Pipeline` according to your application needs.
+
+### Datasets
+Euphoria uses the concept of 'Datasets' to describe data pipeline between 
`Operators`. This concept is similar to Beam's `PCollection` and can be 
converted back and forth through:
+```java
+PCollection<T> someCollection = ...
+
+// PCollection -> Dataset
+Dataset<T> dataset = Dataset.of(someCollection);
+
+//And now back: Dataset -> PCollection
+PCollection<T> collection = dataset.getPCollection();
+```
+
+### Inputs and Outputs
+Input data can be supplied through Beams IO into `PCollection`, the same way 
as in Beam, and wrapped by `Dataset.of(PCollection<T> pCollection)` into 
`Dataset` later on.
+
+```java
+PCollection<String> input =
+  pipeline
+    .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
+    .setTypeDescriptor(TypeDescriptor.of(String.class));
+
+Dataset<String> dataset =  Dataset.of(input);
+```
+Outputs can be treated the same way as inputs, last `Dataset` is converted to 
`PCollection` and dumped into appropriate IO.
+
+### Adding Operators
+Real power of Euphoria API is in its [operators suite](#operator-reference). 
Once we get our hands on `Dataset` we are able to create and connect operators. 
Each Operator consumes one or more input and produces one output
+`Dataset`. Lets take a look at simple `MapElements` example.
+
+```java
+Dataset<Integer> input = ...
+
+Dataset<String> mappedElements =
+  MapElements
+    .named("Int2Str")
+    .of(input)
+    .using(String::valueOf)
+    .output();
+```
+The operator consumes `input`, it applies given lambda expression 
(`String::valueOf`) on each element of `input` and returns mapped `Dataset`. 
Developer is guided through series of steps when creating operator so the 
declaration of an operator is straightforward. To start building operator just 
wrote its name and '.' (dot). Your IDE will give you hints.
+
+First step to build any operator is to give it a name through `named()` 
method. The name is propagated through system and can latter be used when 
debugging.
+
+### Coders and Types
+Beam's Java SDK requires developers to supply `Coder` for custom element type 
in order to have a way of materializing elements. Euphoria allows to use 
[Kryo](https://github.com/EsotericSoftware/kryo) as a way of serialization. The 
[Kryo](https://github.com/EsotericSoftware/kryo) is located in 
`:beam-sdks-java-extensions-kryo` module.
+
+```groovy
+//gradle
+dependencies {
+    compile "org.apache.beam:beam-sdks-java-extensions-kryo:${beam.version}"
+}
+```
+```xml
+//maven
+<dependency>
+  <groupId>org.apache.beam</groupId>
+  <artifactId>beam-sdks-java-extensions-kryo</artifactId>
+  <version>${beam.version}</version>
+</dependency>
+```
+
+All you need is to create `KryoCoderProvider` and register it to your
+`Pipeline`. There are two ways of doing that.
+
+When prototyping you may decide not to care much about coders, then create 
`KryoCoderProvider` without any class registrations to 
[Kryo](https://github.com/EsotericSoftware/kryo).
+```java
+//Register `KryoCoderProvider` which attempt to use `KryoCoder` to every 
non-primitive type
+KryoCoderProvider.of().registerTo(pipeline);
+```
+Such a `KryoCoderProvider` will return `KryoCoder` for every non-primitive 
element type. That of course degrades performance, since Kryo is not able to 
serialize instance of unknown types effectively. But it boost speed of pipeline 
development. This behavior is enabled by default and can be disabled when 
creating `Pipeline` through `KryoOptions`.
+```java
+PipelineOptions options = PipelineOptionsFactory.create();
+options.as(KryoOptions.class).setKryoRegistrationRequired(true);
+```
+
+Second more performance friendly way is to register all the types which will 
Kryo serialize. Sometimes it is also a good idea to register Kryo serializers 
of its own too. Euphoria allows you to do that by implementing your own 
`KryoRegistrar` and using it when creating `KryoCoderProvider`.
+```java
+//Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
+options.as(KryoOptions.class).setKryoRegistrationRequired(true);
+
+KryoCoderProvider.of(
+        (kryo) -> { //KryoRegistrar of your uwn
+          kryo.register(KryoSerializedElementType.class); //other may follow
+        })
+    .registerTo(pipeline);
+```
+Beam resolves coders using types of elements. Type information is not 
available at runtime when element type is described by lambda implementation. 
It is due to type erasure and dynamic nature of lambda expressions. So there is 
an optional way of supplying `TypeDescriptor` every time new type is introduced 
during Operator construction.
+```java
+Dataset<Integer> input = ...
+
+MapElements
+  .named("Int2Str")
+  .of(input)
+  .using(String::valueOf, TypeDescriptors.strings())
+  .output();
+```
+Euphoria operator's will use `TypeDescriptor<Object>`, when `TypeDescriptors` 
is not supplied by user. So `KryoCoderProvider` may return `KryoCOder<Object>` 
for every element with unknown type, if allowed by `KryoOptions`. Supplying 
`TypeDescriptors` becomes mandatory when using 
`.setKryoRegistrationRequired(true)`.
+
+### Metrics and Accumulators
+Statistics about job's internals are very helpful during development of 
distributed jobs. Euphoria calls them accumulators. They are accessible through 
environment `Context`, which can be obtained from `Collector`, whenever working 
with it. It is usually present when zero-to-many output elements are expected 
from operator. For example in case of `FlatMap`.
+```java
+Pipeline pipeline = ...
+Dataset<String> dataset = ..
+
+Dataset<String> mapped =
+FlatMap
+  .named("FlatMap1")
+  .of(dataset)
+  .using(
+    (String value, Collector<String> context) -> {
+      context.getCounter("my-counter").increment();
+        context.collect(value);
     })
+  .output();
+```
+`MapElements` also allows for `Context` to be accessed by supplying 
implementations of `UnaryFunctionEnv` (add second context argument) instead of 
`UnaryFunctor`.
+```java
+Pipeline pipeline = ...
+Dataset<String> dataset = ...
+
+Dataset<String> mapped =
+  MapElements
+    .named("MapThem")
+    .of(dataset)
+    .using(
+      (input, context) -> {
+        // use simple counter
+        context.getCounter("my-counter").increment();
+
+        return input.toLowerCase();
+        })
+      .output();
+```
+Accumulators are translated into Beam Metrics in background so they can be 
viewed the same way. Namespace of translated metrics is set to operator's name.
+
+### Windowing
+Euphoria follows the same [windowing principles]({{ site.baseurl 
}}/documentation/programming-guide/#windowing) as Beam Java SDK. Every shuffle 
operator (operator which needs to shuffle data over the network) allows you to 
set it. The same parameters as in Beam are required. `WindowFn`, `Trigger`, 
`WindowingStrategy` and other. Users are guided to either set all mandatory and 
several optional parameters  or none when building an operator. Windowing is 
propagated down through the `Pipeline`.
+```java
+Dtaset<KV<Integer, Long>> countedElements =
+CountByKey.of(input)
+    .keyBy(e -> e)
+    .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
+    .triggeredBy(DefaultTrigger.of())
+    .discardingFiredPanes()
+    .withAllowedLateness(Duration.standardSeconds(5))
+    .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
+    .withTimestampCombiner(TimestampCombiner.EARLIEST)
+    .output();
+```
+
+### Integration of Euphoria into existing pipelines
+`Euphoria` allows to define composite `PTransform` so Euphoria can be 
seamlessly integrated to already existing Beam `Pipelines`. User only need to 
provide implementation of function which takes input `Dataset`  and outputs 
another `Datatset`. The input dataset is nothing else than mirror of a input 
`PCollection`. Output `Dataset` is transformed to `Pcollection` automatically.
+```java
+//suppose inputs PCollection contains: [ "a", "b", "c", "A", "a", "C", "x"]
+PCollection<KV<String, Long>> lettersWithCounts =
+  inputs.apply("count-uppercase-letters-in-Euphoria",
+    Euphoria.of(
+      (Dataset<String> input) -> {
+        Dataset<String> upperCase =
+          MapElements.of(input)
+            .using((UnaryFunction<String, String>) String::toUpperCase)
+            .output();
+
+        return CountByKey.of(upperCase).keyBy(e -> e).output();
+    }));
+//now the 'lettersWithCounts' will conntain [ KV("A", 3L), KV("B", 1L), 
KV("C", 2L), KV("X", 1L) ]
+```
+
+## How to get Euphoria
+Euphoria is located in `dsl-euphoria` branch, 
`beam-sdks-java-extensions-euphoria` module of The Apache Beam project. To 
build `euphoria` subproject call:
+```
+./gradlew beam-sdks-java-extensions-euphoria:build
+```
+
+## Operator Reference
+Operators are basically higher level data transformations, which allows you to 
build business logic of your data processing job in a simple way. All the 
Euphoria operators are documented in this section including examples. There are 
no examples with [windowing](#windowing) applied for the sake of simplicity. 
Refer to the [windowing section](#windowing) for more details.
+
+### `CountByKey`
+Counting elements with the same key. Requires input dataset to be mapped by 
given key extractor (`UnaryFunction`) to keys which are then counted. Output is 
emitted as `KV<K, Long>` (`K` is key type) where each `KV` contains key and 
number of element in input dataset for the key.
+```java
+// suppose input: [1, 2, 4, 1, 1, 3]
+Dataset<KV<Integer, Long>> output =
+  CountByKey.of(input)
+    .keyBy(e -> e)
     .output();
+// Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]
+```
 
-// From each input element we extract a key (word) and value, which is the 
constant `1`.
-// Then, we reduce by the key - the operator ensures that all values for the 
same
-// key end up being processed together. It applies user defined function 
(summing word counts for each
-// unique word) and its emitted to output. 
-Dataset<Pair<String, Long>> counted = ReduceByKey.named("COUNT")
+### `Distinct`
+ Outputting distinct (based on equals method) elements. It takes optional 
`UnaryFunction` mapper parameter which maps elements to output type.
+ ```java
+// suppose input: [1, 2, 3, 3, 2, 1]
+Distinct.named("unique-integers-only")
+  .of(input)
+  .output();
+// Output will contain:  1, 2, 3
+ ```
+`Distinct` with mapper.
+```java
+// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 
0L), KV(3, 0L)]
+Distinct.named("unique-keys-only")
+  .of(keyValueInput)
+  .mapped(KV::getKey)
+  .output();
+// Output will contain:  1, 3, 42
+```
+
+### `Join`
+Represents inner join of two (left and right) datasets on given key producing 
a new dataset. Key is extracted from both datasets by separate extractors so 
elements in left and right can have different types denoted as `LeftT` and 
`RightT`. The join itself is performed by user-supplied `BinaryFunctor` which 
consumes elements from both dataset sharing the same key. And outputs result of 
the join (`OutputT`). The operator emits output dataset of `KV<K, OutputT>` 
type.
+```java
+// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
+// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", 
"duck"]
+Dataset<KV<Integer, String>> joined =
+  Join.named("join-length-to-words")
+    .of(left, right)
+    .by(le -> le, String::length) // key extractors
+    .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + 
r))
+    .output();
+// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, 
"4+duck"),
+// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]
+```
+
+### `LeftJoin`
+Represents left join of two (left and right) datasets on given key producing 
single new dataset. Key is extracted from both datasets by separate extractors 
so elements in left and right can have different types denoted as `LeftT` and 
`RightT`. The join itself is performed by user-supplied `BinaryFunctor` which 
consumes one element from both dataset, where right is present optionally, 
sharing the same key. And outputs result of the join (`OutputT`). The operator 
emits output dataset of `KV<K, OutputT>` type.
+```java
+// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
+// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", 
"duck"]
+    Dataset<KV<Integer, String>> joined =
+        LeftJoin.named("left-join-length-to-words")
+            .of(left, right)
+            .by(le -> le, String::length) // key extractors
+            .using(
+                (Integer l, Optional<String> r, Collector<String> c) ->
+                    c.collect(l + "+" + r.orElse(null)))
+            .output();
+// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
+// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
+// KV(3, "3+rat"), KV(1, "1+X")]
+```
+Euphoria support performance optimization called 'BroadcastHashJoin' for the 
`LeftJoin`. User can indicate through previous operator's output hint 
`.output(SizeHint.FITS_IN_MEMORY)` that output `Dataset` of that operator fits 
in executors memory. And when the `Dataset` is used as right input, Euphoria 
will automatically translated `LeftJoin` as 'BroadcastHashJoin'. Broadcast join 
can be very efficient when joining between skewed datasets.
+
+### `RightJoin`
+Represents right join of two (left and right) datasets on given key producing 
single new dataset. Key is extracted from both datasets by separate extractors 
so elements in left and right can have different types denoted as `LeftT` and 
`RightT`. The join itself is performed by user-supplied `BinaryFunctor` which 
consumes one element from both dataset, where left is present optionally, 
sharing the same key. And outputs result of the join (`OutputT`). The operator 
emits output dataset of `KV<K, OutputT>` type.
+```java
+// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
+// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", 
"duck"]
+Dataset<KV<Integer, String>> joined =
+  RightJoin.named("right-join-length-to-words")
+    .of(left, right)
+    .by(le -> le, String::length) // key extractors
+    .using(
+      (Optional<Integer> l, String r, Collector<String> c) ->
+        c.collect(l.orElse(null) + "+" + r))
+    .output();
+    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
+    // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
+    // KV(8, "null+elephant"), KV(5, "null+mouse")]
+```
+Euphoria support performance optimization called 'Broadcast Hash Join' for the 
`RightJoin`. User can indicate through previous operator's output hint 
`.output(SizeHint.FITS_IN_MEMORY)` that output `Dataset` of that operator fits 
in executors memory. And when the `Dataset` is used as left input, Euphoria 
will automatically translated `RightJoin` as 'Broadcast Hash Join'. Broadcast 
join can be very efficient when joining between skewed datasets.
+
+### `FullJoin`
+Represents full outer join of two (left and right) datasets on given key 
producing single new dataset. Key is extracted from both datasets by separate 
extractors so elements in left and right can have different types denoted as 
`LeftT` and `RightT`. The join itself is performed by user-supplied 
`BinaryFunctor` which consumes one element from both dataset, where both are 
present only optionally, sharing the same key. And outputs result of the join 
(`OutputT`). The operator emits output dataset of `KV<K, OutputT>` type.
+```java
+// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
+// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", 
"duck"]
+Dataset<KV<Integer, String>> joined =
+  FullJoin.named("join-length-to-words")
+    .of(left, right)
+    .by(le -> le, String::length) // key extractors
+    .using(
+      (Optional<Integer> l, Optional<String> r, Collector<String> c) ->
+        c.collect(l.orElse(null) + "+" + r.orElse(null)))
+    .output();
+// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, 
"3+rat"),
+// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, 
"1+X"),
+//  KV(1, "null+elephant"), KV(5, "null+mouse")]
+```
+
+### `MapElements`
+Transforms one input element of input type `InputT` to one output element of 
another (potentially the same) `OutputT` type. Transformation is done through 
user specified `UnaryFunction`.
+```java
+// suppose inputs contains: [ 0, 1, 2, 3, 4, 5]
+Dataset<String> strings =
+  MapElements.named("int2str")
+    .of(input)
+    .using(i -> "#" + i)
+    .output();
+// strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]
+```
+
+### `FlatMap`
+Transforms one input element of input type `InputT` to zero or more output 
elements of another (potentially the same) `OutputT` type. Transformation is 
done through user specified `UnaryFunctor`, where `Collector<OutputT>` is 
utilized to emit output elements. Notice similarity with `MapElements` which 
can always emit only one element.
+```java
+// suppose words contain: ["Brown", "fox", ".", ""]
+Dataset<String> letters =
+  FlatMap.named("str2char")
     .of(words)
-    .keyBy(w -> w)
-    .valueBy(w -> 1L)
-    .combineBy(Sums.ofLongs())
+    .using(
+      (String s, Collector<String> collector) -> {
+        for (int i = 0; i < s.length(); i++) {
+          char c = s.charAt(i);
+          collector.collect(String.valueOf(c));
+        }
+      })
     .output();
+// characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]
+```
+`FlatMap` may be used to determine time-stamp of elements. It is done by 
supplying implementation of `ExtractEventTime` time extractor when building it. 
There is specialized `AssignEventTime` operator to assign time-stamp to 
elements. Consider using it, you code may be more readable.
+```java
+// suppose events contain events of SomeEventObject, its 
'getEventTimeInMillis()' methods returns time-stamp
+Dataset<SomeEventObject> timeStampedEvents =
+  FlatMap.named("extract-event-time")
+    .of(events)
+    .using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
+    .eventTimeBy(SomeEventObject::getEventTimeInMillis)
+    .output();
+//Euphoria will now know event time for each event
+```
 
-// Format output.
-Dataset<String> output = MapElements.named("FORMAT")
-    .of(counted)
-    .using(p -> p.getFirst() + ": " + p.getSecond())
+### `Filter`
+`Filter` throws away all the elements which do not pass given condition. The 
condition is supplied by the user as implementation of `UnaryPredicate`. Input 
and output elements are of the same type.
+```java
+// suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
+Dataset<Integer> divisibleBythree =
+  Filter.named("divisibleByThree").of(nums).by(e -> e % 3 == 0).output();
+//divisibleBythree will contain: [ 0, 3, 6, 9]
+```
+
+### `ReduceByKey`
+Performs aggregation of `InputT` type elements with the same key through 
user-supplied reduce function. Key is extracted from each element through 
`UnaryFunction` which takes input element and outputs its key of type `K`. 
Elements can optionally be mapped to value of type `V`, it happens before 
elements shuffle, so it can have positive performance influence.
+
+Finally, elements with the same key are aggregated by user-defined 
`ReduceFunctor`, `ReduceFunction` or `CombinableReduceFunction`. They differs 
in number of arguments they take and in way output is interpreted. 
`ReduceFunction` is basically a function which takes `Stream` of elements as 
input and outputs one aggregation result. `ReduceFunctor` takes second 
`Collector` which allows for access to `Context`. When 
`CombinableReduceFunction` is provided, partial reduction is performed before 
shuffle so less data have to be transported through network.
+
+Following example shows basic usage of `ReduceByKey` operator including value 
extraction.
+```java
+//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+Dataset<KV<Integer, Long>> countOfAnimalNamesByLength =
+  ReduceByKey.named("to-letters-couts")
+    .of(animals)
+    .keyBy(String::length) // length of animal name will be used as groupping 
key
+    // we need to count each animal name once, so why not to optimize each 
string to 1
+    .valueBy(e -> 1)
+    .reduceBy(Stream::count)
     .output();
+// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), 
KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+```
 
-// Transform Dataset back to PCollection. It can be done in any step of this 
flow.
-PCollection<String> outputCollection = flow.unwrapped(output);
+Now suppose that we want to track our `ReduceByKey` internals using counter.
+```java
+//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+Dataset<KV<Integer, Long>> countOfAnimalNamesByLenght =
+  ReduceByKey.named("to-letters-couts")
+    .of(animals)
+    .keyBy(String::length) // length of animal name will be used as grouping 
key
+    // we need to count each animal name once, so why not to optimize each 
string to 1
+    .valueBy(e -> 1)
+    .reduceBy(
+      (Stream<Integer> s, Collector<Long> collector) -> {
+        collector.collect(s.count());
+        collector.asContext().getCounter("num-of-keys").increment();
+      })
+      .output();
+// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), 
KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+```
 
-// Now we can again use Beam transformation. In this case we save words and 
their count
-// into the text file.
-outputCollection.apply(TextIO.write().to(options.getOutput()));
+Again the same example with optimized combinable output.
+```java
+//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+Dataset<KV<Integer, Long>> countOfAnimalNamesByLenght =
+  ReduceByKey.named("to-letters-couts")
+    .of(animals)
+    .keyBy(String::length) // length of animal name will e used as grouping key
+    // we need to count each animal name once, so why not to optimize each 
string to 1
+    .valueBy(e -> 1L)
+    .combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be 
enough
+    .output();
+// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), 
KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+```
+Note that the provided `CombinableReduceFunction` has to be associative and 
commutative to be truly combinable. So it can be used to compute partial 
results before shuffle. And then merge partial result to one. That is why 
simple `Stream::count` will not work in this example unlike in the previous one.
 
-pipeline.run();
+Euphoria aims to make code easy to write and read. Therefore some support to 
write combinable reduce functions in form of `Fold` or folding function is 
already there. It allows user to supply only the reduction logic 
(`BinaryFunction`) and creates `CombinableReduceFunction` out of it. Supplied 
`BinaryFunction` still have to be associative.
+```java
+//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+Dataset<KV<Integer, Long>> countOfAnimalNamesByLenght =
+  ReduceByKey.named("to-letters-couts")
+    .of(animals)
+    .keyBy(String::length) // length of animal name will be used as grouping 
key
+    // we need to count each animal name once, so why not to optimize each 
string to 1
+    .valueBy(e -> 1L)
+    .combineBy(Fold.of((l1, l2) -> l1 + l2))
+    .output();
+// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), 
KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+```
+
+### `ReduceWindow`
+Reduces all elements in a [window](#windowing). The operator corresponds to 
`ReduceByKey` with the same key for all elements, so the actual key is defined 
only by window.
+```java
+//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
+//lets assign time-stamp to each input element
+Dataset<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * 
i).output();
+
+Dataset<Integer> output =
+  ReduceWindow.of(withEventTime)
+    .combineBy(Fold.of((i1, i2) -> i1 + i2))
+    .windowBy(FixedWindows.of(Duration.millis(5000)))
+    .triggeredBy(DefaultTrigger.of())
+    .discardingFiredPanes()
+    .output();
+//output will contain: [ 10, 26 ]
+```
+
+### `SumByKey`
+Summing elements with same key. Requires input dataset to be mapped by given 
key extractor (`UnaryFunction`) to keys. By value extractor, also 
`UnaryFunction` which outputs to `Long`, to values. Those values are then 
grouped by key and summed. Output is emitted as `KV<K, Long>` (`K` is key type) 
where each `KV` contains key and number of element in input dataset for the key.
+```java
+//suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
+Dataset<KV<Integer, Long>> output =
+  SumByKey.named("sum-odd-and-even")
+    .of(input)
+    .keyBy(e -> e % 2)
+    .valueBy(e -> (long) e)
+    .output();
+// output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]
+```
+
+### `Union`
+Merge of at least two datasets of the same type without any guarantee about 
elements ordering.
+```java
+//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
+//suppose rodents conains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
+Dataset<String> animals =
+  Union.named("to-animals")
+    .of(cats, rodents)
+    .output();
+// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", 
"mouse", "rat", "lemming", "beaver"
 ```
 
+### `TopPerKey`
+Emits one top-rated element per key. Key of type `K` is extracted by given 
`UnaryFunction`. Another `UnaryFunction` extractor allows for conversion input 
elements to values of type `V`. Selection of top element is based on _score_, 
which is obtained from each element by user supplied `UnaryFunction` called 
score calculator. Score type is denoted as `ScoreT` and it is required to 
extend `Comparable<ScoreT>` so scores of two elements can be compared directly. 
Output dataset elements are of type `Triple<K, V, ScoreT>`.
+```java
+// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", 
"dinosaur", "cat", "duck", "caterpillar" ]
+Dataset<Triple<Character, String, Integer>> longestNamesByLetter =
+  TopPerKey.named("longest-animal-names")
+    .of(animals)
+    .keyBy(name -> name.charAt(0)) // first character is the key
+    .valueBy(UnaryFunction.identity()) // value type is the same as input 
element type
+    .scoreBy(String::length) // length defines score, note that Integer 
implements Comparable<Integer>
+    .output();
+//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), 
('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
+```
+`TopPerKey` is a shuffle operator so it allows for widowing to be defined.
+
+### `AssignEventTime`
+Euphoria needs to know how to extract time-stamp from elements when 
[windowing](#windowing) is applied. `AssignEventTime` tells Euphoria how to do 
that through given implementation of `ExtractEventTime` function.
+```java
+// suppose events contain events of SomeEventObject, its 
'getEventTimeInMillis()' methods returns time-stamp
+Dataset<SomeEventObject> timeStampedEvents =
+  AssignEventTime.named("extract-event-tyme")
+    .of(events)
+    .using(SomeEventObject::getEventTimeInMillis)
+    .output();
+//Euphoria will now know event time for each event
+```
+
+## Euphoria To Beam Translation (advanced user section)
+Euphoria API is build on top of Beam Java SDK. The API is transparently 
translated into Beam's `PTransforms` in background. Most of the translation 
happens in `org.apache.beam.sdk.extensions.euphoria.core.translate` package. 
Where the most interesting classes are:
+* `OperatorTranslator` - Interface which defining inner API of Euphoria to 
Beam translation.
+* `TranslatorProvider` - Way of supplying custom translators.
+* `OperatorTransform` - Which is governing actual translation and/or expansion 
Euphoria's operators to Beam's `PTransform`
+* `EuphoriaOptions` - A `PipelineOptions`, allows for setting custom 
`TranslatorProvider`.
+
+The package also contains implementation of `OperatorTranslator` for each 
supported operator type (`JoinTranslator`, `FlatMapTranslator`, 
`ReduceByKeyTranslator`). Not every operator needs to have translator of its 
own. Some of them can be composed from other operators. That is why operators 
may implement `CompositeOperator` which give them option to be exanded to set 
of other Euphoria operators.
 
+The translation process was designed with flexibility in mind. We wanted to 
allow different ways of translating higher-level Euphoria operators to Beam's 
SDK's primitives. It allows for further performance optimizations based on user 
choices or some knowledge about data obtained automatically.  
 
+### Unsupported Features
+[Original Euphoria](https://github.com/seznam/euphoria) contained some 
features and operators not jet supported in Beam port. List of not yet 
supported features follows:
+* Translation of `ReduceStateByKey` operator to Beam is not supported. 
Therefore `TopPerKey` decomposable to RSBK is also not supported.
+* `ReduceByKey` in original Euphoria was allowed to sort output values (per 
key). This is also not yet translatable into Beam, therefore not supported.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152512)
    Time Spent: 2.5h  (was: 2h 20m)

> Write Euphoria in Beam documentation
> ------------------------------------
>
>                 Key: BEAM-5124
>                 URL: https://issues.apache.org/jira/browse/BEAM-5124
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-euphoria
>            Reporter: Vaclav Plajt
>            Assignee: Vaclav Plajt
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to