> producing control events from JobMaster is similar to triggering a
savepoint.

Paul, here is what I see the difference. Upon job or jobmanager recovery,
we don't need to recover and replay the savepoint trigger signal.

On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <paullin3...@gmail.com> wrote:

> +1 for this feature. Setting up a separate control stream is too much for
> many use cases, it would very helpful if users can leverage the built-in
> control flow of Flink.
>
> My 2 cents:
> 1. @Steven IMHO, producing control events from JobMaster is similar to
> triggering a savepoint. The REST api is non-blocking, and users should poll
> the results to confirm the operation is succeeded. If something goes wrong,
> it’s user’s responsibility to retry.
> 2. There are two kinds of existing special elements, special stream
> records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
> flow through the whole DAG, but events needs to be acknowledged by
> downstream and can overtake records, while stream records are not). So I’m
> wondering if we plan to unify the two approaches in the new control flow
> (as Xintong mentioned both in the previous mails)?
>
> Best,
> Paul Lam
>
> 2021年6月8日 14:08,Steven Wu <stevenz...@gmail.com> 写道:
>
>
> I can see the benefits of control flow. E.g., it might help the old (and
> inactive) FLIP-17 side input. I would suggest that we add more details of
> some of the potential use cases.
>
> Here is one mismatch with using control flow for dynamic config. Dynamic
> config is typically targeted/loaded by one specific operator. Control flow
> will propagate the dynamic config to all operators. not a problem per se
>
> Regarding using the REST api (to jobmanager) for accepting control
> signals from external system, where are we going to persist/checkpoint the
> signal? jobmanager can die before the control signal is propagated and
> checkpointed. Did we lose the control signal in this case?
>
>
> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
>> +1 on separating the effort into two steps:
>>
>>    1. Introduce a common control flow framework, with flexible
>>    interfaces for generating / reacting to control messages for various
>>    purposes.
>>    2. Features that leverating the control flow can be worked on
>>    concurrently
>>
>> Meantime, keeping collecting potential features that may leverage the
>> control flow should be helpful. It provides good inputs for the control
>> flow framework design, to make the framework common enough to cover the
>> potential use cases.
>>
>> My suggestions on the next steps:
>>
>>    1. Allow more time for opinions to be heard and potential use cases
>>    to be collected
>>    2. Draft a FLIP with the scope of common control flow framework
>>    3. We probably need a poc implementation to make sure the framework
>>    covers at least the following scenarios
>>       1. Produce control events from arbitrary operators
>>       2. Produce control events from JobMaster
>>       3. Consume control events from arbitrary operators downstream
>>       where the events are produced
>>
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Very thanks Jiangang for bringing this up and very thanks for the
>>> discussion!
>>>
>>> I also agree with the summarization by Xintong and Jing that control
>>> flow seems to be
>>> a common buidling block for many functionalities and dynamic
>>> configuration framework
>>> is a representative application that frequently required by users.
>>> Regarding the control flow,
>>> currently we are also considering the design of iteration for the
>>> flink-ml, and as Xintong has pointed
>>> out, it also required the control flow in cases like detection global
>>> termination inside the iteration
>>>  (in this case we need to broadcast an event through the iteration body
>>> to detect if there are still
>>> records reside in the iteration body). And regarding  whether to
>>> implement the dynamic configuration
>>> framework, I also agree with Xintong that the consistency guarantee
>>> would be a point to consider, we
>>> might consider if we need to ensure every operator could receive the
>>> dynamic configuration.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> ------------------------------------------------------------------
>>> Sender:kai wang<yiduwang...@gmail.com>
>>> Date:2021/06/08 11:52:12
>>> Recipient:JING ZHANG<beyond1...@gmail.com>
>>> Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink User
>>> Mailing List archive.]<ml+s2336050n44245...@n4.nabble.com>; user<
>>> u...@flink.apache.org>; dev<dev@flink.apache.org>
>>> Theme:Re: Add control mode for flink
>>>
>>>
>>>
>>> I'm big +1 for this feature.
>>>
>>>    1. Limit the input qps.
>>>    2. Change log level for debug.
>>>
>>> in my team, the two examples above are needed
>>>
>>> JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道:
>>>
>>>> Thanks Jiangang for bringing this up.
>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>>> provides many useful functions in Kuaishou, because it could update job
>>>> behavior without relaunching the job. The functions are very popular in
>>>> Kuaishou, we also see similar demands in maillist [1].
>>>>
>>>> I'm big +1 for this feature.
>>>>
>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>>> idea about introducing control mode in Flink.
>>>> It takes the original issue a big step closer to essence which also
>>>> provides the possibility for more fantastic features as mentioned in
>>>> Xintong and Jark's response.
>>>> Based on the idea, there are at least two milestones to achieve the
>>>> goals which were proposed by Jiangang:
>>>> (1) Build a common control flow framework in Flink.
>>>>      It focuses on control flow propagation. And, how to integrate the
>>>> common control flow framework with existing mechanisms.
>>>> (2) Builds a dynamic configuration framework which is exposed to users
>>>> directly.
>>>>      We could see dynamic configuration framework is a top application
>>>> on the underlying control flow framework.
>>>>      It focuses on the Public API which receives configuration updating
>>>> requests from users. Besides, it is necessary to introduce an API
>>>> protection mechanism to avoid job performance degradation caused by too
>>>> many control events.
>>>>
>>>> I suggest splitting the whole design into two after we reach a
>>>> consensus on whether to introduce this feature because these two sub-topic
>>>> all need careful design.
>>>>
>>>>
>>>> [
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>>>> ]
>>>>
>>>> Best regards,
>>>> JING ZHANG
>>>>
>>>> 刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道:
>>>>
>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>>>> long-running, it is similar to many services. So interacting with it or
>>>>> controlling it is a common desire. This was our initial thought when
>>>>> implementing the feature. In our inner flink, many configs used in yaml 
>>>>> can
>>>>> be adjusted by dynamic to avoid restarting the job, for examples as 
>>>>> follow:
>>>>>
>>>>>    1. Limit the input qps.
>>>>>    2. Degrade the job by sampling and so on.
>>>>>    3. Reset kafka offset in certain cases.
>>>>>    4. Stop checkpoint in certain cases.
>>>>>    5. Control the history consuming.
>>>>>    6. Change log level for debug.
>>>>>
>>>>>
>>>>> After deep discussion, we realize that a common control flow
>>>>> will benefit both users and developers. Dynamic config is just one of the
>>>>> use cases. For the concrete design and implementation, it relates with 
>>>>> many
>>>>> components, like jobmaster, network channel, operators and so on, which
>>>>> needs deeper consideration and design.
>>>>>
>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>>>>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:
>>>>>
>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>>>>> feedback.
>>>>>>
>>>>>> I was part of the preliminary offline discussions before this
>>>>>> proposal went public. So maybe I can help clarify things a bit.
>>>>>>
>>>>>> In short, despite the phrase "control mode" might be a bit
>>>>>> misleading, what we truly want to do from my side is to make the concept 
>>>>>> of
>>>>>> "control flow" explicit and expose it to users.
>>>>>>
>>>>>> ## Background
>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version of
>>>>>> Flink. One of their custom features is allowing dynamically changing
>>>>>> operator behaviors via the REST APIs. He's willing to contribute this
>>>>>> feature to the community, and came to Yun Gao and me for suggestions. 
>>>>>> After
>>>>>> discussion, we feel that the underlying question to be answered is how do
>>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>>>>> API can be one of the features built on top of the control flow, and 
>>>>>> there
>>>>>> could be others.
>>>>>>
>>>>>> ## Control flow
>>>>>> Control flow refers to the communication channels for sending
>>>>>> events/signals to/between tasks/operators, that changes Flink's behavior 
>>>>>> in
>>>>>> a way that may or may not affect the computation logic. Typical control
>>>>>> events/signals Flink currently has are watermarks and checkpoint 
>>>>>> barriers.
>>>>>>
>>>>>> In general, for modeling control flow, the following questions should
>>>>>> be considered.
>>>>>> 1. Who (which component) is responsible for generating the control
>>>>>> messages?
>>>>>> 2. Who (which component) is responsible for reacting to the messages.
>>>>>> 3. How do the messages propagate?
>>>>>> 4. When it comes to affecting the computation logics, how should the
>>>>>> control flow work together with the exact-once consistency.
>>>>>>
>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>>>>> share many things in common. A unified control flow model would help
>>>>>> deduplicate the common logics, allowing us to focus on the use case
>>>>>> specific parts.
>>>>>>
>>>>>> E.g.,
>>>>>> - Watermarks: generated by source operators, handled by window
>>>>>> operators.
>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>>>>>> handled by all tasks
>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>>>>>> REST command), handled by specific operators/UDFs
>>>>>> - Operator defined events: The following features are still in
>>>>>> planning, but may potentially benefit from the control flow model. 
>>>>>> (Please
>>>>>> correct me if I'm wrong, @Yun, @Jark)
>>>>>>   * Iteration: When a certain condition is met, we might want to
>>>>>> signal downstream operators with an event
>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>>>>>> for indicating the end of each mini-batch, which makes it tricky to deal
>>>>>> with event time related computations.
>>>>>>   * Hive dimension table join: For periodically reloaded hive tables,
>>>>>> it would be helpful to have specific events signaling that a reloading is
>>>>>> finished.
>>>>>>   * Bootstrap dimension table join: This is similar to the previous
>>>>>> one. In cases where we want to fully load the dimension table before
>>>>>> starting joining the mainstream, it would be helpful to have an event
>>>>>> signaling the finishing of the bootstrap.
>>>>>>
>>>>>> ## Dynamic REST controlling
>>>>>> Back to the specific feature that Jiangang proposed, I personally
>>>>>> think it's quite convenient. Currently, to dynamically change the 
>>>>>> behavior
>>>>>> of an operator, we need to set up a separate source for the control 
>>>>>> events
>>>>>> and leverage broadcast state. Being able to send the events via REST APIs
>>>>>> definitely improves the usability.
>>>>>>
>>>>>> Leveraging dynamic configuration frameworks is for sure one possible
>>>>>> approach. The reason we are in favor of introducing the control flow is
>>>>>> that:
>>>>>> - It benefits not only this specific dynamic controlling feature, but
>>>>>> potentially other future features as well.
>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
>>>>>> framework work together with Flink's consistency mechanism.
>>>>>>
>>>>>> Thank you~
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>>>>>>
>>>>>>> Thank you for the reply. I have checked the post you mentioned. The
>>>>>>> dynamic config may be useful sometimes. But it is hard to keep data
>>>>>>> consistent in flink, for example, what if the dynamic config will take
>>>>>>> effect when failover. Since dynamic config is a desire for users, maybe
>>>>>>> flink can support it in some way.
>>>>>>>
>>>>>>> For the control mode, dynamic config is just one of the control
>>>>>>> modes. In the google doc, I have list some other cases. For example,
>>>>>>> control events are generated in operators or external services. Besides
>>>>>>> user's dynamic config, flink system can support some common dynamic
>>>>>>> configuration, like qps limit, checkpoint control and so on.
>>>>>>>
>>>>>>> It needs good design to handle the control mode structure. Based on
>>>>>>> that, other control features can be added easily later, like changing 
>>>>>>> log
>>>>>>> level when job is running. In the end, flink will not just process data,
>>>>>>> but also interact with users to receive control events like a service.
>>>>>>>
>>>>>>> Steven Wu <[hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五
>>>>>>> 下午11:11写道:
>>>>>>>
>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>>>>>>>> more like a dynamic config problem that probably should be solved by 
>>>>>>>> some
>>>>>>>> configuration framework. Here is one post from google search:
>>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>>>>>>>
>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>       Flink jobs are always long-running. When the job is running,
>>>>>>>>> users may want to control the job but not stop it. The control 
>>>>>>>>> reasons can
>>>>>>>>> be different as following:
>>>>>>>>>
>>>>>>>>>    1. Change data processing’ logic, such as filter condition.
>>>>>>>>>    2. Send trigger events to make the progress forward.
>>>>>>>>>    3. Define some tools to degrade the job, such as limit input
>>>>>>>>>    qps, sampling data.
>>>>>>>>>    4. Change log level to debug current problem.
>>>>>>>>>
>>>>>>>>>       The common way to do this is to stop the job, do
>>>>>>>>> modifications and start the job. It may take a long time to recover. 
>>>>>>>>> In
>>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is
>>>>>>>>> related to money or important activities.So we need some
>>>>>>>>> technologies to control the running job without stopping the job.
>>>>>>>>>
>>>>>>>>> We propose to add control mode for flink. A control mode based on
>>>>>>>>> the restful interface is first introduced. It works by these steps:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    1. The user can predefine some logic which supports config
>>>>>>>>>    control, such as filter condition.
>>>>>>>>>    2. Run the job.
>>>>>>>>>    3. If the user wants to change the job's running logic, just
>>>>>>>>>    send a restful request with the responding config.
>>>>>>>>>
>>>>>>>>> Other control modes will also be considered in the future. More
>>>>>>>>> introduction can refer to the doc
>>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>>>>>>>> . If the community likes the proposal, more discussion is needed and 
>>>>>>>>> a more
>>>>>>>>> detailed design will be given later. Any suggestions and ideas are 
>>>>>>>>> welcome.
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email ml+s2336050n1...@n4.nabble.com
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=>
>>>>>> .
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>
>

Reply via email to