Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5922#discussion_r187922328
  
    --- Diff: docs/dev/stream/state/broadcast_state.md ---
    @@ -0,0 +1,279 @@
    +---
    +title: "The Broadcast State Pattern"
    +nav-parent_id: streaming_state
    +nav-pos: 2
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* ToC
    +{:toc}
    +
    +[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
    +parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
    +
    +A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
    +where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
    +and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
    +natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
    +elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
    +of operator states in that:
    + 1. it has a map format,
    + 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
    + 3. such an operator can have *multiple broadcast states* with different 
names.
    +
    +## Provided APIs
    +
    +To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
    +example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
    +of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
    +the set of interesting patterns evolves over time. 
    +
    +In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
    +stream will contain the `Rules`.
    +
    +Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
    +make sure that elements of the same color end up on the same physical 
machine.
    +
    +{% highlight java %}
    +// key the shapes by color
    +KeyedStream<Item, Color> colorPartitionedStream = shapeStream
    +                        .keyBy(new KeySelector<Shape, Color>(){...});
    +{% endhighlight %}
    +
    +Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
    +should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
    +the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
    +will be stored.
    +
    +{% highlight java %}
    +
    +// a map descriptor to store the name of the rule (string) and the rule 
itself.
    +MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
    +                           "RulesBroadcastState",
    +                           BasicTypeInfo.STRING_TYPE_INFO,
    +                           TypeInformation.of(new TypeHint<Rule>() {})
    +           );
    +           
    +// broadcast the rules and create the broadcast state
    +BroadcastStream<Rule> ruleBroadcastStream = ruleStream
    +                        .broadcast(ruleStateDescriptor);
    +{% endhighlight %}
    +
    +Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
    +    1) connect the two streams and 
    +    2) specify our match detecting logic. 
    +
    +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
    +non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
    +which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
    +The exact type of the function depends on the type of the non-broadcasted 
stream: 
    + - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
    + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
    + 
    + Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
    +
    +<div class="alert alert-info">
    +  <strong>Attention:</strong> The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
    --- End diff --
    
    It is not even available as an option to call `connect()` in this case. The 
`broadcast(descriptor)` will give you a `BroadcastStream` back that has no 
available transformations. You can only use it as an argument for a `connect()`.


---

Reply via email to