[ 
https://issues.apache.org/jira/browse/FLINK-8780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465549#comment-16465549
 ] 

ASF GitHub Bot commented on FLINK-8780:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5922#discussion_r186342376
  
    --- Diff: docs/dev/stream/state/broadcast_state.md ---
    @@ -0,0 +1,279 @@
    +---
    +title: "The Broadcast State Pattern"
    +nav-parent_id: streaming_state
    +nav-pos: 2
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* ToC
    +{:toc}
    +
    +[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
    +parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
    +
    +A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
    +where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
    +and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
    +natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
    +elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
    +of operator states in that:
    + 1. it has a map format,
    + 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
    + 3. such an operator can have *multiple broadcast states* with different 
names.
    +
    +## Provided APIs
    +
    +To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
    +example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
    +of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
    +the set of interesting patterns evolves over time. 
    +
    +In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
    +stream will contain the `Rules`.
    +
    +Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
    +make sure that elements of the same color end up on the same physical 
machine.
    +
    +{% highlight java %}
    +// key the shapes by color
    +KeyedStream<Item, Color> colorPartitionedStream = shapeStream
    +                        .keyBy(new KeySelector<Shape, Color>(){...});
    +{% endhighlight %}
    +
    +Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
    +should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
    +the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
    +will be stored.
    +
    +{% highlight java %}
    +
    +// a map descriptor to store the name of the rule (string) and the rule 
itself.
    +MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
    +                           "RulesBroadcastState",
    +                           BasicTypeInfo.STRING_TYPE_INFO,
    +                           TypeInformation.of(new TypeHint<Rule>() {})
    +           );
    +           
    +// broadcast the rules and create the broadcast state
    +BroadcastStream<Rule> ruleBroadcastStream = ruleStream
    +                        .broadcast(ruleStateDescriptor);
    +{% endhighlight %}
    +
    +Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
    +    1) connect the two streams and 
    +    2) specify our match detecting logic. 
    +
    +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
    +non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
    +which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
    +The exact type of the function depends on the type of the non-broadcasted 
stream: 
    + - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
    + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
    + 
    + Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
    +
    +<div class="alert alert-info">
    +  <strong>Attention:</strong> The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
    +   as an argument.
    +</div>
    +
    +{% highlight java %}
    +DataStream<Match> output = colorPartitionedStream
    +                 .connect(ruleBroadcastStream)
    +                 .process(
    +                     
    +                     // type arguments in our 
KeyedBroadcastProcessFunction represent: 
    +                     //   1. the key of the keyed stream
    +                     //   2. the type of elements in the non-broadcast side
    +                     //   3. the type of elements in the broadcast side
    +                     //   4. the type of the result, here a string
    +                     
    +                     new KeyedBroadcastProcessFunction<Color, Item, Rule, 
String>() {
    +                         // my matching logic
    +                     }
    +                 )
    +{% endhighlight %}
    +
    +### BroadcastProcessFunction and KeyedBroadcastProcessFunction
    +
    +As in the case of a `CoProcessFunction`, these methods have two "sides", 
one is responsible for processing incoming 
    +elements in the broadcasted stream and one is used for the non-broadcasted 
one. This is reflected in the methods to be 
    +implemented, which are presented below.
    +
    +{% highlight java %}
    +public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends 
BaseBroadcastProcessFunction {
    +
    +    public abstract void processElement(IN1 value, ReadOnlyContext ctx, 
Collector<OUT> out) throws Exception;
    +
    +    public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
    +}
    +{% endhighlight %}
    +
    +{% highlight java %}
    +public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
    +
    +    public abstract void processElement(IN1 value, ReadOnlyContext ctx, 
Collector<OUT> out) throws Exception;
    +
    +    public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
    +
    +    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> 
out) throws Exception;
    +}
    +{% endhighlight %}
    +
    +The first thing to notice is that both functions require the 
implementation of the `processBroadcastElement()` method 
    +for processing elements in the broadcast side and the `processElement()` 
for elements in the non-broadcasted side. 
    +
    +The two methods differ in the context they are provided. The non-broadcast 
side has a `ReadOnlyContext`, while the 
    +broadcasted side has a `Context`. 
    +
    +Both of these contexts (`ctx` in the following enumeration):
    + 1. give access to the broadcast state: 
`ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)`
    + 2. allow to query the timestamp of the element: `ctx.timestamp()`, 
    + 3. get the current watermark: `ctx.currentWatermark()`
    + 4. get the current processing time: `ctx.currentProcessingTime()`, and 
    + 5. emit elements to side-outputs: `ctx.output(OutputTag<X> outputTag, X 
value)`. 
    +
    +The `stateDescriptor` in the `getBroadcastState()` should be identical to 
the one in the `.broadcast(ruleStateDescriptor)` 
    +above.
    +
    +The difference lies in the type of access each one gives to the broadcast 
state. The broadcasted side has 
    +**read-write access** to it, while the non-broadcast side has **read-only 
access** (thus the names). The reason for this
    +is that in Flink there is no cross-task communication. So, to guarantee 
that the contents in the Broadcast State are the
    +same across all parallel instances of our operator, we give read-write 
access only to the broadcast side, which sees the
    +same elements across all tasks, and we require the computation on each 
incoming element on that side to be identical 
    +across all tasks. Ignoring this rule would break the consistency 
guarantees of the state, leading to inconsistent and 
    +often difficult to debug results.
    +
    +<div class="alert alert-info">
    +  <strong>Attention:</strong> The logic implemented in 
`processBroadcast()` must have the same determinstic behavior 
    +  across all parallel instances!
    +</div>
    +
    +Finally, due to the fact that the `KeyedBroadcastProcessFunction` is (on 
one "side") operating on a keyed stream, it 
    +exposes some functionality which is not available to the 
`BroadcastProcessFunction`. That is:
    + 1. the `ReadOnlyContext` in the `processElement()` method gives access to 
Flink's underlying timer service, which allows
    +  to register event and/or processing time timers. When a timer fires, the 
`onTimer()` (shown above) is invoked with an 
    +  `OnTimerContext` which exposes the same functionality as the 
`ReadOnlyContext` plus 
    +   - the ability to ask if the timer that fired was an event or processing 
time one and 
    +   - to query the key associated with the timer.
    +
    +  This is aligned with the `onTimer()` method of the 
`KeyedProcessFunction`. 
    + 2. the `Context` in the `processBroadcastElement()` method contains the 
method 
    + `applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, 
KeyedStateFunction<KS, S> function)`. This allows to 
    +  register a `KeyedStateFunction` to be **applied to all states of all 
keys** associated with the provided `stateDescriptor`. 
    +
    +<div class="alert alert-info">
    +  <strong>Attention:</strong> Registering timers is only possible at 
`processElement()` of the `KeyedBroadcastProcessFunction`
    +  and only there. It is not possible in the `processBroadcastElement()` 
method, as there is no key associated to the 
    +  broadcasted elements.
    +</div>
    +  
    +Coming back to our original example, our `KeyedBroadcastProcessFunction` 
could look like the following:
    +
    +{% highlight java %}
    +new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
    +
    +    // store partial matches, i.e. first elements of the pair waiting for 
their second element
    +    // we keep a list as we may have many first elements waiting
    +    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
    +       new MapStateDescriptor<>(
    +           "items",
    +           BasicTypeInfo.STRING_TYPE_INFO, 
    +           new ListTypeInfo<>(Item.class));
    +
    +    // identical to our ruleStateDescriptor above
    +    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
    +        new MapStateDescriptor<>(
    +               "RulesBroadcastState",
    +                   BasicTypeInfo.STRING_TYPE_INFO,
    +                   TypeInformation.of(new TypeHint<Rule>() {}));
    +
    +   @Override
    +   public void processBroadcastElement(Rule value, 
    +                                       Context ctx, 
    +                                       Collector<String> out) throws 
Exception {
    +       ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    +   }
    +
    +   @Override
    +   public void processElement(Item value, 
    +                              ReadOnlyContext ctx, 
    +                              Collector<String> out) throws Exception {
    +
    +        final MapState<String, List<Item>> state = 
getRuntimeContext().getMapState(mapStateDesc);
    +        final Shape shape = value.getShape();
    +    
    +        for (Map.Entry<String, Rule> entry: 
    +                
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
    +            final String ruleName = entry.getKey();
    +            final Rule rule = entry.getValue();
    +    
    +            List<Item> stored = state.get(ruleName);
    +            if (stored == null) {
    +                stored = new ArrayList<>();
    +            }
    +    
    +            if (shape == rule.second && !stored.isEmpty()) {
    +                for (Item i : stored) {
    +                    out.collect("MATCH: " + i + " - " + value);
    +                }
    +                stored.clear();
    +            }
    +    
    +            // there is  no else{} to cover if rule.first == rule.second
    +            if (shape.equals(rule.first)) {
    +                stored.add(value);
    +            }
    +    
    +            if (stored.isEmpty()) {
    +                state.remove(ruleName);
    +            } else {
    +                state.put(ruleName, stored);
    +            }
    +        }
    +   }
    +}
    +{% endhighlight %}
    +
    +## Important Considerations
    +
    +After describing the offered APIs, this section focuses on the important 
things to keep in mind when using broadcast 
    +state. These are:
    +
    +  - **There is no cross-task communication:** As stated earlier, this is 
the reason why only the broadcast side of a 
    +`(Keyed)-BroadcastProcessFunction` can modify the contents of the 
broadcast state. In addition, the user has to make 
    +sure that all tasks modify the contents of the broadcast state in the same 
way for each incoming element. Otherwise,
    +different tasks might have different contents, leading to inconsistent 
results.
    +
    +  - **Order of events in Broadcast State may differ across tasks:** 
Although broadcasting the elements of a stream 
    --- End diff --
    
    Maybe we should simply clearly state that, the state update for each 
incoming element **CANNOT** be order-depending.


> Add Broadcast State documentation.
> ----------------------------------
>
>                 Key: FLINK-8780
>                 URL: https://issues.apache.org/jira/browse/FLINK-8780
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Documentation, Streaming
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Critical
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to