Hi Konstantin,

Thanks for sharing your thoughts. Please see the reply inline below.

Thanks,

Jiangjie (Becket) Qin

On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf <kna...@apache.org> wrote:

> Hi Becket, Hi Nicholas,
>
> Thanks for joining the discussion.
>
> 1 ) Personally, I would argue that we should only run user code in the
> Jobmanager/Jobmaster if we can not avoid it. It seems wrong to me to
> encourage users to e.g. run a webserver on the Jobmanager, or continuously
> read patterns from a Kafka Topic on the Jobmanager, but both of these I see
> happening with the current design. We've had lots of issues with
> classloading leaks and other stability issues on the Jobmanager and making
> this more complicated, if there is another way, seems unnecessary.


I think the key question here is what primitive does Flink provide to
facilitate the user implementation of their own control logic / control
plane? It looks that previously, Flink assumes that all the user logic is
just data processing logic without any control / coordination requirements.
However, it turns out that a decent control plane abstraction is required
in association with the data processing logic in many cases, including
Source / Sink and other user defined operators in general. The fact that we
ended up with adding the SplitEnumerator and GlobalCommitter are just two
examples of the demand of such coordination among user defined logics.
There are other cases that we see in ecosystem projects, such as
deep-learning-on-flink[1]. Now we see this again in CEP.

Such control plane primitives are critical to the extensibility of a
project. If we look at other projects, exposing such control plane logic is
quite common. For example, Hadoop ended up with exposing YARN as a public
API to the users, which is extremely popular. Kafka consumers exposed the
consumer group rebalance logic to the users via ConsumerPartitionAssigner,
which is also a control plane primitive.

To me it is more important to think about how we can improve the stability
of such a control plane mechanism, instead of simply saying no to the users.




> 2) In addition, I suspect that, over time we will have to implement all the
> functionality that regular sources already provide around consistency
> (watermarks, checkpointing) for the PatternProcessorCoordinator, too.


I think OperatorCoordinator should have a generic communication mechanism
for all the operators, not specific to Source. We should probably have an
AbstractOperatorCoordinator help dealing with the communication layer, and
leave the state maintenance and event handling logic to the user code.


> 3) I understand that running on the Jobmanager is easier if you want to
> launch a REST server directly. Here my question would be: does this really
> need to be solved inside of Flink or couldn't you start a webserver next to
> Flink? If we start using the Jobmanager as a REST server users will expect
> that e.g. it is highly available and can be load balanced and we quickly
> need to think about aspects that we never wanted to think about in the
> context of a Flink Jobmanager.
>

I think the REST API is just for receiving commands targeting a running
Flink job. If the job fails, the REST API would be useless.


> So, can you elaborate a bit more, why a side-input/broadcast stream is
>
> a) more difficult
> b) has vague semantics (To me semantics of a stream-stream seem clearer
> when it comes to out-of-orderness, late data, reprocessing or batch
> execution mode.)


I do agree that having the user defined control logic defined in the JM
increases the chance of instability. In that case, we may think of other
solutions and I am fully open to that. But the side-input / broadcast
stream seems more like a bandaid instead of a carefully designed control
plane mechanism.

A decent control plane requires two-way communication, so information can
be reported / collected from the entity being controlled, and the
coordinator / controller can send decisions or commands to the entities
accordingly, just like our TM / JM communication. IIUC, this is not
achievable with the existing side-input / broadcast stream as both of them
are one-way communication mechanisms. For instance, the example I gave in
my previous email seems not easily achievable with side-input / broadcast
streams: a single invalid pattern detected on a TM can be disabled
elegantly globally without crashing the entire Flink job.


> Cheers,
>
> Konstantin
>
>
> On Tue, Dec 21, 2021 at 11:38 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi folks,
> >
> > I just finished reading the previous emails. It was a good discussion.
> Here
> > are my two cents:
> >
> > *Is OperatorCoordinator a public API?*
> > Regarding the OperatorCoordinator, although it is marked as internal
> > interface at this point, I intended to make it a public interface when
> add
> > that in FLIP-27. This is a powerful cross-subtask communication mechanism
> > that enables many use cases, Source / Sink / TF on Flink / CEP here
> again.
> > To my understanding, OC was marked as internal because we think it is not
> > stable enough yet. We may need to fortify the OperatorEvent delivery
> > semantic a little bit so it works well with checkpoint in general.
> >
> > I think it is a valid concern that user code running in JM may cause
> > instability. However, not providing this communication mechanism only
> makes
> > a lot of use case even harder to implement. So it seems that having the
> OC
> > exposed to end users brings more benefits than harm to Flink. At the end
> of
> > the day, users have many ways to crash a Flink job if they do not write
> > proper code. So making those who know what they do happy seems more
> > important here.
> >
> > *OperatorCoordinator V.S. side-input / broadcast stream*
> > I think both of them can achieve the goal of dynamic patterns. The main
> > difference is the extensibility.
> >
> > OC is a 2-way communication mechanism, i.e. a subtask can also send
> > OperatorEvent to the coordinator to report its owns status, so that the
> > coordinator can act accordingly. This is sometimes useful. For example, a
> > single invalid pattern can be disabled elegantly without crashing the
> > entire Flink job. In the future, if we allow users to send external
> command
> > to OC via JM, a default self-contained implementation can just update the
> > pattern via the REST API without external dependencies.
> >
> > Another reason I personally prefer OC is because it is an explicit
> control
> > plain mechanism, where as the side-input / broadcast stream has are more
> > vague semantic.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Dec 21, 2021 at 4:25 PM David Morávek <d...@apache.org> wrote:
> >
> > > Hi Yunfeng,
> > >
> > > thanks for drafting this FLIP, this will be a great addition into the
> CEP
> > > toolbox!
> > >
> > > Apart from running user code in JM, which want to avoid in general, I'd
> > > have one more another concern about using the OperatorCoordinator and
> > that
> > > is re-processing of the historical data. Any thoughts about how this
> will
> > > work with the OC?
> > >
> > > I have a slight feeling that a side-input (custom source / operator +
> > > broadcast) would a better fit for this case. This would simplify the
> > > consistency concerns (watermarks + pushback) and the re-processing of
> > > historical data.
> > >
> > > Best,
> > > D.
> > >
> > >
> > > On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <
> nicholasji...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi Konstantin, Martijn
> > > >
> > > > Thanks for the detailed feedback in the discussion. What I still have
> > > left
> > > > to answer/reply to:
> > > >
> > > > -- Martijn: Just to be sure, this indeed would mean that if for
> > whatever
> > > > reason the heartbeat timeout, it would crash the job, right?
> > > >
> > > > IMO, if for whatever reason the heartbeat timeout, it couldn't check
> > the
> > > > PatternProcessor consistency between the OperatorCoordinator and the
> > > > subtasks so that the job would be crashed.
> > > >
> > > > -- Konstantin: What I was concerned about is that we basically let
> > users
> > > > run a UserFunction in the OperatorCoordinator, which it does not seem
> > to
> > > > have been designed for.
> > > >
> > > > In general, we have reached an agreement on the design of this FLIP,
> > but
> > > > there are some concerns on the OperatorCoordinator, about whether
> > > basically
> > > > let users run a UserFunction in the OperatorCoordinator is designed
> for
> > > > OperatorCoordinator. We would like to invite Becket Qin who is the
> > author
> > > > of OperatorCoordinator to help us to answer this concern.
> > > >
> > > > Best,
> > > > Nicholas Jiang
> > > >
> > > >
> > > > On 2021/12/20 10:07:14 Martijn Visser wrote:
> > > > > Hi all,
> > > > >
> > > > > Really like the discussion on this topic moving forward. I really
> > think
> > > > > this feature will be much appreciated by the Flink users. What I
> > still
> > > > have
> > > > > left to answer/reply to:
> > > > >
> > > > > -- Good point. If for whatever reason the different taskmanagers
> > can't
> > > > get
> > > > > the latest rule, the Operator Coordinator could send a heartbeat to
> > all
> > > > > taskmanagers with the latest rules and check the heartbeat response
> > > from
> > > > > all the taskmanagers whether the latest rules of the taskmanager is
> > > equal
> > > > > to these of the Operator Coordinator.
> > > > >
> > > > > Just to be sure, this indeed would mean that if for whatever reason
> > the
> > > > > heartbeat timeout, it would crash the job, right?
> > > > >
> > > > > -- We have consided about the solution mentioned above. In this
> > > > solution, I
> > > > > have some questions about how to guarantee the consistency of the
> > rule
> > > > > between each TaskManager. By having a coodinator in the JobManager
> to
> > > > > centrally manage the latest rules, the latest rules of all
> > TaskManagers
> > > > are
> > > > > consistent with those of the JobManager, so as to avoid the
> > > > inconsistencies
> > > > > that may be encountered in the above solution. Can you introduce
> how
> > > this
> > > > > solution guarantees the consistency of the rules?
> > > > >
> > > > > The consistency that we could guarantee was based on how often each
> > > > > TaskManager would do a refresh and how often we would accept a
> > refresh
> > > to
> > > > > fail. We set the refresh time to a relatively short one (30
> seconds)
> > > and
> > > > > maximum failures to 3. That meant that we could guarantee that
> rules
> > > > would
> > > > > be updated in < 2 minutes or else the job would crash. That was
> > > > sufficient
> > > > > for our use cases. This also really depends on how big your cluster
> > > is. I
> > > > > can imagine that if you have a large scale cluster that you want to
> > > run,
> > > > > you don't want to DDOS the backend system where you have your rules
> > > > stored.
> > > > >
> > > > > -- In summary, the current design is that JobManager tells all
> > > > TaskManagers
> > > > > the latest rules through OperatorCoodinator, and will initiate a
> > > > heartbeat
> > > > > to check whether the latest rules on each TaskManager are
> consistent.
> > > We
> > > > > will describe how to deal with the Failover scenario in more detail
> > on
> > > > FLIP.
> > > > >
> > > > > Thanks for that. I think having the JobManager tell the
> TaskManagers
> > > the
> > > > > applicable rules would indeed end up being the best design.
> > > > >
> > > > > -- about the concerns around consistency raised by Martijn: I
> think a
> > > lot
> > > > > of those can be mitigated by using an event time timestamp from
> which
> > > the
> > > > > rules take effect. The reprocessing scenario, for example, is
> covered
> > > by
> > > > > this. If a pattern processor should become active as soon as
> > possible,
> > > > > there will still be inconsistencies between Taskmanagers, but "as
> > soon
> > > as
> > > > > possible" is vague anyway, which is why I think that's ok.
> > > > >
> > > > > I think an event timestamp is indeed a really important one. We
> also
> > > used
> > > > > that in my previous role, with the ruleActivationTimestamp compared
> > to
> > > > > eventtime (well, actually we used Kafka logAppend time because
> > > > > eventtime wasn't always properly set so we used that time to
> > overwrite
> > > > the
> > > > > eventtime from the event itself).
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <kna...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Nicholas, Hi Junfeng,
> > > > > >
> > > > > > about the concerns around consistency raised by Martijn: I think
> a
> > > lot
> > > > of
> > > > > > those can be mitigated by using an event time timestamp from
> which
> > > the
> > > > > > rules take effect. The reprocessing scenario, for example, is
> > covered
> > > > by
> > > > > > this. If a pattern processor should become active as soon as
> > > possible,
> > > > > > there will still be inconsistencies between Taskmanagers, but "as
> > > soon
> > > > as
> > > > > > possible" is vague anyway, which is why I think that's ok.
> > > > > >
> > > > > > about naming: The naming with "PatternProcessor" sounds good to
> me.
> > > > Final
> > > > > > nit: I would go for CEP#patternProccessors, which would be
> > consistent
> > > > with
> > > > > > CEP#pattern.
> > > > > >
> > > > > > I am not sure about one of the rejected alternatives:
> > > > > >
> > > > > > > Have each subtask of an operator make the update on their own
> > > > > >
> > > > > >    -
> > > > > >
> > > > > >    It is hard to achieve consistency.
> > > > > >    -
> > > > > >
> > > > > >       Though the time interval that each subtask makes the update
> > can
> > > > be
> > > > > >       the same, the absolute time they make the update might be
> > > > different.
> > > > > > For
> > > > > >       example, one makes updates at 10:00, 10:05, etc, while
> > another
> > > > does
> > > > > > it at
> > > > > >       10:01, 10:06. In this case the subtasks might never
> > processing
> > > > data
> > > > > > with
> > > > > >       the same set of pattern processors.
> > > > > >
> > > > > >
> > > > > > I would have thought that it is quite easy to poll for the rules
> > from
> > > > each
> > > > > > Subtask at *about *the same time. So, this alone does not seem to
> > be
> > > > > > enough to rule out this option. I've looped in David Moravek to
> get
> > > his
> > > > > > opinion of the additional load imposed on the JM.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Konstantin
> > > > > >
> > > > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang <
> > > > nicholasji...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yue,
> > > > > > >
> > > > > > > Thanks for your feedback of the FLIP. I have addressed your
> > > > questions and
> > > > > > > made a corresponding explanation as follows:
> > > > > > >
> > > > > > > -- About Pattern Updating. If we use
> PatternProcessoerDiscoverer
> > to
> > > > > > update
> > > > > > > the rules, will it increase the load of JM? For example, if the
> > > user
> > > > > > wants
> > > > > > > the updated rule to take effect immediately,, which means that
> we
> > > > need to
> > > > > > > set a shorter check interval or there is another scenario when
> > > users
> > > > > > rarely
> > > > > > > update the pattern, will the PatternProcessoerDiscoverer be in
> > most
> > > > of
> > > > > > the
> > > > > > > time Do useless checks ? Will a lazy update mode could be used,
> > > > which the
> > > > > > > pattern only be updated when triggered by the user, and do
> > nothing
> > > at
> > > > > > other
> > > > > > > times?
> > > > > > >
> > > > > > > PatternProcessoerDiscoverer is a user-defined interface to
> > discover
> > > > the
> > > > > > > PatternProcessor updates. Periodically checking the
> > > PatternProcessor
> > > > in
> > > > > > the
> > > > > > > database is a implementation of the PatternProcessoerDiscoverer
> > > > > > interface,
> > > > > > > which is that periodically querys all the PatternProcessor
> table
> > in
> > > > > > certain
> > > > > > > interval. This implementation indeeds has the useless checks,
> and
> > > > could
> > > > > > > directly integrates the changelog of the table. In addition, in
> > > > addition
> > > > > > to
> > > > > > > the implementation of periodically checking the database, there
> > are
> > > > other
> > > > > > > implementations such as the PatternProcessor that provides
> > Restful
> > > > > > services
> > > > > > > to receive updates.
> > > > > > >
> > > > > > > --  I still have some confusion about how Key Generating
> > Opertator
> > > > and
> > > > > > > CepOperator (Pattern Matching & Processing Operator) work
> > together.
> > > > If
> > > > > > > there are N PatternProcessors, will the Key Generating
> Opertator
> > > > > > generate N
> > > > > > > keyedStreams, and then N CepOperator would process each Key
> > > > separately ?
> > > > > > Or
> > > > > > > every CepOperator Task would process all patterns, if so, does
> > the
> > > > key
> > > > > > type
> > > > > > > in each PatternProcessor need to be the same?
> > > > > > >
> > > > > > > Firstly the Pattern Matching & Processing Operator is not the
> > > > CepOperator
> > > > > > > at present, because CepOperator mechanism is based on the
> > NFAState.
> > > > > > > Secondly if there are N PatternProcessors, the Key Generating
> > > > Opertator
> > > > > > > combines all the keyedStreams with keyBy() operation, thus the
> > > > Pattern
> > > > > > > Matching & Processing Operator would process all the patterns.
> In
> > > > other
> > > > > > > words, the KeySelector of the PatternProcessor is used for the
> > Key
> > > > > > > Generating Opertator, and the Pattern and
> PatternProceessFunction
> > > of
> > > > the
> > > > > > > PatternProcessor are used for the Pattern Matching & Processing
> > > > Operator.
> > > > > > > Lastly the key type in each PatternProcessor is the same,
> > regarded
> > > as
> > > > > > > Object type.
> > > > > > >
> > > > > > > -- Maybe need to pay attention to it when implementing it .If
> > some
> > > > > > Pattern
> > > > > > > has been removed or updated, will the partially matched results
> > in
> > > > > > > StateBackend would be clean up or We rely on state ttl to clean
> > up
> > > > these
> > > > > > > expired states.
> > > > > > >
> > > > > > > If certain Pattern has been removed or updated, the partially
> > > matched
> > > > > > > results in StateBackend would be clean up until the next
> > > checkpoint.
> > > > The
> > > > > > > partially matched result doesn't depend on the state ttl of the
> > > > > > > StateBackend.
> > > > > > >
> > > > > > > 4. Will the PatternProcessorManager keep all the active
> > > > PatternProcessor
> > > > > > > in memory? We have also Support Multiple Rule and Dynamic Rule
> > > > Changing.
> > > > > > > But we are facing such a problem, some users’ usage scenarios
> are
> > > > that
> > > > > > they
> > > > > > > want to have their own pattern for each user_id, which means
> that
> > > > there
> > > > > > > could be thousands of patterns, which would make the
> performance
> > of
> > > > > > Pattern
> > > > > > > Matching very poor. We are also trying to solve this problem.
> > > > > > >
> > > > > > > The PatternProcessorManager keeps all the active
> PatternProcessor
> > > in
> > > > > > > memory. For scenarios that they want to have their own pattern
> > for
> > > > each
> > > > > > > user_id, IMO, is it possible to reduce the fine-grained pattern
> > of
> > > > > > > PatternProcessor to solve the performance problem of the
> Pattern
> > > > > > Matching,
> > > > > > > for example, a pattern corresponds to a group of users? The
> > > scenarios
> > > > > > > mentioned above need to be solved by case by case.
> > > > > > >
> > > > > > > Best,
> > > > > > > Nicholas Jiang
> > > > > > >
> > > > > > > On 2021/12/17 11:43:10 yue ma wrote:
> > > > > > > > Glad to see the Community's progress in Flink CEP. After
> > reading
> > > > this
> > > > > > > Flip,
> > > > > > > > I have few questions, would you please take a look  ?
> > > > > > > >
> > > > > > > > 1. About Pattern Updating. If we use
> > PatternProcessoerDiscoverer
> > > to
> > > > > > > update
> > > > > > > > the rules, will it increase the load of JM? For example, if
> the
> > > > user
> > > > > > > wants
> > > > > > > > the updated rule to take effect immediately,, which means
> that
> > we
> > > > need
> > > > > > to
> > > > > > > > set a shorter check interval  or there is another scenario
> when
> > > > users
> > > > > > > > rarely update the pattern, will the
> PatternProcessoerDiscoverer
> > > be
> > > > in
> > > > > > > most
> > > > > > > > of the time Do useless checks ? Will a lazy update mode could
> > be
> > > > used,
> > > > > > > > which the pattern only be updated when triggered by the user,
> > and
> > > > do
> > > > > > > > nothing at other times ?
> > > > > > > >
> > > > > > > > 2.   I still have some confusion about how Key Generating
> > > > Opertator and
> > > > > > > > CepOperator (Pattern Matching & Processing Operator) work
> > > > together. If
> > > > > > > > there are N PatternProcessors, will the Key Generating
> > Opertator
> > > > > > > generate N
> > > > > > > > keyedStreams, and then N CepOperator would process each Key
> > > > separately
> > > > > > ?
> > > > > > > Or
> > > > > > > > every CepOperator Task would process all patterns, if so,
> does
> > > the
> > > > key
> > > > > > > type
> > > > > > > > in each PatternProcessor need to be the same ?
> > > > > > > >
> > > > > > > > 3. Maybe need to pay attention to it when implementing it .If
> > > some
> > > > > > > Pattern
> > > > > > > > has been removed or updateed  ,will the partially matched
> > results
> > > > in
> > > > > > > > StateBackend would be clean up or We rely on state ttl to
> clean
> > > up
> > > > > > these
> > > > > > > > expired states.
> > > > > > > >
> > > > > > > > 4. Will the PatternProcessorManager keep all the active
> > > > > > PatternProcessor
> > > > > > > in
> > > > > > > > memory ? We have also Support Multiple Rule and Dynamic Rule
> > > > Changing .
> > > > > > > > But we are facing such a problem, some users’ usage scenarios
> > are
> > > > that
> > > > > > > they
> > > > > > > > want to have their own pattern for each user_id, which means
> > that
> > > > there
> > > > > > > > could be thousands of patterns, which would make the
> > performance
> > > of
> > > > > > > Pattern
> > > > > > > > Matching very poor. We are also trying to solve this problem.
> > > > > > > >
> > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2021年12月10日周五
> > > 19:16写道:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'm opening this thread to propose the design to support
> > > multiple
> > > > > > rule
> > > > > > > &
> > > > > > > > > dynamic rule changing in the Flink-CEP project, as
> described
> > in
> > > > > > > FLIP-200
> > > > > > > > > [1]
> > > > > > > > > .
> > > > > > > > >
> > > > > > > > > Currently Flink CEP only supports having a single pattern
> > > inside
> > > > a
> > > > > > > > > CepOperator and does not support changing the pattern
> > > > dynamically. In
> > > > > > > order
> > > > > > > > > to reduce resource consumption and to experience shorter
> > > downtime
> > > > > > > during
> > > > > > > > > pattern updates, there is a growing need in the production
> > > > > > environment
> > > > > > > that
> > > > > > > > > expects CEP to support having multiple patterns in one
> > operator
> > > > and
> > > > > > to
> > > > > > > > > support dynamically changing them. Therefore I propose to
> add
> > > > certain
> > > > > > > > > infrastructure as described in FLIP-200 to support these
> > > > > > > functionalities.
> > > > > > > > >
> > > > > > > > > Please feel free to reply to this email thread. Looking
> > forward
> > > > to
> > > > > > your
> > > > > > > > > feedback!
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Yunfeng
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Konstantin Knauf
> > > > > >
> > > > > > https://twitter.com/snntrable
> > > > > >
> > > > > > https://github.com/knaufk
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>

Reply via email to