I can see the benefits of control flow. E.g., it might help the old (and
inactive) FLIP-17 side input. I would suggest that we add more details of
some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic
config is typically targeted/loaded by one specific operator. Control flow
will propagate the dynamic config to all operators. not a problem per se

Regarding using the REST api (to jobmanager) for accepting control
signals from external system, where are we going to persist/checkpoint the
signal? jobmanager can die before the control signal is propagated and
checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <tonysong...@gmail.com> wrote:

> +1 on separating the effort into two steps:
>
>    1. Introduce a common control flow framework, with flexible interfaces
>    for generating / reacting to control messages for various purposes.
>    2. Features that leverating the control flow can be worked on
>    concurrently
>
> Meantime, keeping collecting potential features that may leverage the
> control flow should be helpful. It provides good inputs for the control
> flow framework design, to make the framework common enough to cover the
> potential use cases.
>
> My suggestions on the next steps:
>
>    1. Allow more time for opinions to be heard and potential use cases to
>    be collected
>    2. Draft a FLIP with the scope of common control flow framework
>    3. We probably need a poc implementation to make sure the framework
>    covers at least the following scenarios
>       1. Produce control events from arbitrary operators
>       2. Produce control events from JobMaster
>       3. Consume control events from arbitrary operators downstream where
>       the events are produced
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Very thanks Jiangang for bringing this up and very thanks for the
>> discussion!
>>
>> I also agree with the summarization by Xintong and Jing that control flow
>> seems to be
>> a common buidling block for many functionalities and dynamic
>> configuration framework
>> is a representative application that frequently required by users.
>> Regarding the control flow,
>> currently we are also considering the design of iteration for the
>> flink-ml, and as Xintong has pointed
>> out, it also required the control flow in cases like detection global
>> termination inside the iteration
>>  (in this case we need to broadcast an event through the iteration body
>> to detect if there are still
>> records reside in the iteration body). And regarding  whether to
>> implement the dynamic configuration
>> framework, I also agree with Xintong that the consistency guarantee would
>> be a point to consider, we
>> might consider if we need to ensure every operator could receive the
>> dynamic configuration.
>>
>> Best,
>> Yun
>>
>>
>>
>> ------------------------------------------------------------------
>> Sender:kai wang<yiduwang...@gmail.com>
>> Date:2021/06/08 11:52:12
>> Recipient:JING ZHANG<beyond1...@gmail.com>
>> Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink User
>> Mailing List archive.]<ml+s2336050n44245...@n4.nabble.com>; user<
>> u...@flink.apache.org>; dev<dev@flink.apache.org>
>> Theme:Re: Add control mode for flink
>>
>>
>>
>> I'm big +1 for this feature.
>>
>>    1. Limit the input qps.
>>    2. Change log level for debug.
>>
>> in my team, the two examples above are needed
>>
>> JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道:
>>
>>> Thanks Jiangang for bringing this up.
>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>> provides many useful functions in Kuaishou, because it could update job
>>> behavior without relaunching the job. The functions are very popular in
>>> Kuaishou, we also see similar demands in maillist [1].
>>>
>>> I'm big +1 for this feature.
>>>
>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>> idea about introducing control mode in Flink.
>>> It takes the original issue a big step closer to essence which also
>>> provides the possibility for more fantastic features as mentioned in
>>> Xintong and Jark's response.
>>> Based on the idea, there are at least two milestones to achieve the
>>> goals which were proposed by Jiangang:
>>> (1) Build a common control flow framework in Flink.
>>>      It focuses on control flow propagation. And, how to integrate the
>>> common control flow framework with existing mechanisms.
>>> (2) Builds a dynamic configuration framework which is exposed to users
>>> directly.
>>>      We could see dynamic configuration framework is a top application
>>> on the underlying control flow framework.
>>>      It focuses on the Public API which receives configuration updating
>>> requests from users. Besides, it is necessary to introduce an API
>>> protection mechanism to avoid job performance degradation caused by too
>>> many control events.
>>>
>>> I suggest splitting the whole design into two after we reach a consensus
>>> on whether to introduce this feature because these two sub-topic all need
>>> careful design.
>>>
>>>
>>> [
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>> ]
>>>
>>> Best regards,
>>> JING ZHANG
>>>
>>> 刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道:
>>>
>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>> long-running, it is similar to many services. So interacting with it or
>>>> controlling it is a common desire. This was our initial thought when
>>>> implementing the feature. In our inner flink, many configs used in yaml can
>>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>>
>>>>    1. Limit the input qps.
>>>>    2. Degrade the job by sampling and so on.
>>>>    3. Reset kafka offset in certain cases.
>>>>    4. Stop checkpoint in certain cases.
>>>>    5. Control the history consuming.
>>>>    6. Change log level for debug.
>>>>
>>>>
>>>> After deep discussion, we realize that a common control flow
>>>> will benefit both users and developers. Dynamic config is just one of the
>>>> use cases. For the concrete design and implementation, it relates with many
>>>> components, like jobmaster, network channel, operators and so on, which
>>>> needs deeper consideration and design.
>>>>
>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>>>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:
>>>>
>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>> feedback.
>>>>>
>>>>> I was part of the preliminary offline discussions before this proposal
>>>>> went public. So maybe I can help clarify things a bit.
>>>>>
>>>>> In short, despite the phrase "control mode" might be a bit misleading,
>>>>> what we truly want to do from my side is to make the concept of "control
>>>>> flow" explicit and expose it to users.
>>>>>
>>>>> ## Background
>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version of
>>>>> Flink. One of their custom features is allowing dynamically changing
>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>> feature to the community, and came to Yun Gao and me for suggestions. 
>>>>> After
>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>> API can be one of the features built on top of the control flow, and there
>>>>> could be others.
>>>>>
>>>>> ## Control flow
>>>>> Control flow refers to the communication channels for sending
>>>>> events/signals to/between tasks/operators, that changes Flink's behavior 
>>>>> in
>>>>> a way that may or may not affect the computation logic. Typical control
>>>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>>>
>>>>> In general, for modeling control flow, the following questions should
>>>>> be considered.
>>>>> 1. Who (which component) is responsible for generating the control
>>>>> messages?
>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>> 3. How do the messages propagate?
>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>> control flow work together with the exact-once consistency.
>>>>>
>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>> share many things in common. A unified control flow model would help
>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>> specific parts.
>>>>>
>>>>> E.g.,
>>>>> - Watermarks: generated by source operators, handled by window
>>>>> operators.
>>>>> - Checkpoint barrier: generated by the checkpoint coordinator, handled
>>>>> by all tasks
>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the REST
>>>>> command), handled by specific operators/UDFs
>>>>> - Operator defined events: The following features are still in
>>>>> planning, but may potentially benefit from the control flow model. (Please
>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>> signal downstream operators with an event
>>>>>   * Mini-batch assembling: Flink currently uses special watermarks for
>>>>> indicating the end of each mini-batch, which makes it tricky to deal with
>>>>> event time related computations.
>>>>>   * Hive dimension table join: For periodically reloaded hive tables,
>>>>> it would be helpful to have specific events signaling that a reloading is
>>>>> finished.
>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>> one. In cases where we want to fully load the dimension table before
>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>> signaling the finishing of the bootstrap.
>>>>>
>>>>> ## Dynamic REST controlling
>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>> think it's quite convenient. Currently, to dynamically change the behavior
>>>>> of an operator, we need to set up a separate source for the control events
>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>> definitely improves the usability.
>>>>>
>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>> that:
>>>>> - It benefits not only this specific dynamic controlling feature, but
>>>>> potentially other future features as well.
>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>> framework work together with Flink's consistency mechanism.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>
>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>> flink can support it in some way.
>>>>>>
>>>>>> For the control mode, dynamic config is just one of the control
>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>> control events are generated in operators or external services. Besides
>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>
>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>> that, other control features can be added easily later, like changing log
>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>> but also interact with users to receive control events like a service.
>>>>>>
>>>>>> Steven Wu <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>> 下午11:11写道:
>>>>>>
>>>>>>> I am not sure if we should solve this problem in Flink. This is more
>>>>>>> like a dynamic config problem that probably should be solved by some
>>>>>>> configuration framework. Here is one post from google search:
>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>
>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>>       Flink jobs are always long-running. When the job is running,
>>>>>>>> users may want to control the job but not stop it. The control reasons 
>>>>>>>> can
>>>>>>>> be different as following:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Change data processing’ logic, such as filter condition.
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    Send trigger events to make the progress forward.
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Define some tools to degrade the job, such as limit input qps,
>>>>>>>>    sampling data.
>>>>>>>>    4.
>>>>>>>>
>>>>>>>>    Change log level to debug current problem.
>>>>>>>>
>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>> modifications and start the job. It may take a long time to recover. In
>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>> related to money or important activities.So we need some
>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>
>>>>>>>>
>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>    control, such as filter condition.
>>>>>>>>    2. Run the job.
>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>    send a restful request with the responding config.
>>>>>>>>
>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>> introduction can refer to the doc
>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>> . If the community likes the proposal, more discussion is needed and a 
>>>>>>>> more
>>>>>>>> detailed design will be given later. Any suggestions and ideas are 
>>>>>>>> welcome.
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=>
>>>>> .
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>

Reply via email to