Hi David,

Thanks for your feedback of the FLIP. I addressed the comments above and share 
the thoughts about the question mentioned:

*About how this will work with the OperatorCoordinator for re-processing of the 
historical data using the OperatorCoordinator?*

OperatorCoordinator will checkpoint the full amount of PatternProcessor data. 
For the reprocessing of historical data, you can read the PatternProcessor 
snapshots saved by this checkpoint from a certain historical checkpoint, and 
then recreate the historical data through these PatternProcessor snapshots.

About the side-input (custom source / operator + broadcast), Becket has given 
the explanation for the OperatorCoordinator V.S. side-input / broadcast stream. 
You could share your thoughts about this.

Best,
Nicholas Jiang

On 2021/12/21 08:24:53 David Morávek wrote:
> Hi Yunfeng,
> 
> thanks for drafting this FLIP, this will be a great addition into the CEP
> toolbox!
> 
> Apart from running user code in JM, which want to avoid in general, I'd
> have one more another concern about using the OperatorCoordinator and that
> is re-processing of the historical data. Any thoughts about how this will
> work with the OC?
> 
> I have a slight feeling that a side-input (custom source / operator +
> broadcast) would a better fit for this case. This would simplify the
> consistency concerns (watermarks + pushback) and the re-processing of
> historical data.
> 
> Best,
> D.
> 
> 
> On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <nicholasji...@apache.org>
> wrote:
> 
> > Hi Konstantin, Martijn
> >
> > Thanks for the detailed feedback in the discussion. What I still have left
> > to answer/reply to:
> >
> > -- Martijn: Just to be sure, this indeed would mean that if for whatever
> > reason the heartbeat timeout, it would crash the job, right?
> >
> > IMO, if for whatever reason the heartbeat timeout, it couldn't check the
> > PatternProcessor consistency between the OperatorCoordinator and the
> > subtasks so that the job would be crashed.
> >
> > -- Konstantin: What I was concerned about is that we basically let users
> > run a UserFunction in the OperatorCoordinator, which it does not seem to
> > have been designed for.
> >
> > In general, we have reached an agreement on the design of this FLIP, but
> > there are some concerns on the OperatorCoordinator, about whether basically
> > let users run a UserFunction in the OperatorCoordinator is designed for
> > OperatorCoordinator. We would like to invite Becket Qin who is the author
> > of OperatorCoordinator to help us to answer this concern.
> >
> > Best,
> > Nicholas Jiang
> >
> >
> > On 2021/12/20 10:07:14 Martijn Visser wrote:
> > > Hi all,
> > >
> > > Really like the discussion on this topic moving forward. I really think
> > > this feature will be much appreciated by the Flink users. What I still
> > have
> > > left to answer/reply to:
> > >
> > > -- Good point. If for whatever reason the different taskmanagers can't
> > get
> > > the latest rule, the Operator Coordinator could send a heartbeat to all
> > > taskmanagers with the latest rules and check the heartbeat response from
> > > all the taskmanagers whether the latest rules of the taskmanager is equal
> > > to these of the Operator Coordinator.
> > >
> > > Just to be sure, this indeed would mean that if for whatever reason the
> > > heartbeat timeout, it would crash the job, right?
> > >
> > > -- We have consided about the solution mentioned above. In this
> > solution, I
> > > have some questions about how to guarantee the consistency of the rule
> > > between each TaskManager. By having a coodinator in the JobManager to
> > > centrally manage the latest rules, the latest rules of all TaskManagers
> > are
> > > consistent with those of the JobManager, so as to avoid the
> > inconsistencies
> > > that may be encountered in the above solution. Can you introduce how this
> > > solution guarantees the consistency of the rules?
> > >
> > > The consistency that we could guarantee was based on how often each
> > > TaskManager would do a refresh and how often we would accept a refresh to
> > > fail. We set the refresh time to a relatively short one (30 seconds) and
> > > maximum failures to 3. That meant that we could guarantee that rules
> > would
> > > be updated in < 2 minutes or else the job would crash. That was
> > sufficient
> > > for our use cases. This also really depends on how big your cluster is. I
> > > can imagine that if you have a large scale cluster that you want to run,
> > > you don't want to DDOS the backend system where you have your rules
> > stored.
> > >
> > > -- In summary, the current design is that JobManager tells all
> > TaskManagers
> > > the latest rules through OperatorCoodinator, and will initiate a
> > heartbeat
> > > to check whether the latest rules on each TaskManager are consistent. We
> > > will describe how to deal with the Failover scenario in more detail on
> > FLIP.
> > >
> > > Thanks for that. I think having the JobManager tell the TaskManagers the
> > > applicable rules would indeed end up being the best design.
> > >
> > > -- about the concerns around consistency raised by Martijn: I think a lot
> > > of those can be mitigated by using an event time timestamp from which the
> > > rules take effect. The reprocessing scenario, for example, is covered by
> > > this. If a pattern processor should become active as soon as possible,
> > > there will still be inconsistencies between Taskmanagers, but "as soon as
> > > possible" is vague anyway, which is why I think that's ok.
> > >
> > > I think an event timestamp is indeed a really important one. We also used
> > > that in my previous role, with the ruleActivationTimestamp compared to
> > > eventtime (well, actually we used Kafka logAppend time because
> > > eventtime wasn't always properly set so we used that time to overwrite
> > the
> > > eventtime from the event itself).
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <kna...@apache.org>
> > wrote:
> > >
> > > > Hi Nicholas, Hi Junfeng,
> > > >
> > > > about the concerns around consistency raised by Martijn: I think a lot
> > of
> > > > those can be mitigated by using an event time timestamp from which the
> > > > rules take effect. The reprocessing scenario, for example, is covered
> > by
> > > > this. If a pattern processor should become active as soon as possible,
> > > > there will still be inconsistencies between Taskmanagers, but "as soon
> > as
> > > > possible" is vague anyway, which is why I think that's ok.
> > > >
> > > > about naming: The naming with "PatternProcessor" sounds good to me.
> > Final
> > > > nit: I would go for CEP#patternProccessors, which would be consistent
> > with
> > > > CEP#pattern.
> > > >
> > > > I am not sure about one of the rejected alternatives:
> > > >
> > > > > Have each subtask of an operator make the update on their own
> > > >
> > > >    -
> > > >
> > > >    It is hard to achieve consistency.
> > > >    -
> > > >
> > > >       Though the time interval that each subtask makes the update can
> > be
> > > >       the same, the absolute time they make the update might be
> > different.
> > > > For
> > > >       example, one makes updates at 10:00, 10:05, etc, while another
> > does
> > > > it at
> > > >       10:01, 10:06. In this case the subtasks might never processing
> > data
> > > > with
> > > >       the same set of pattern processors.
> > > >
> > > >
> > > > I would have thought that it is quite easy to poll for the rules from
> > each
> > > > Subtask at *about *the same time. So, this alone does not seem to be
> > > > enough to rule out this option. I've looped in David Moravek to get his
> > > > opinion of the additional load imposed on the JM.
> > > >
> > > > Thanks,
> > > >
> > > > Konstantin
> > > >
> > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang <
> > nicholasji...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Yue,
> > > > >
> > > > > Thanks for your feedback of the FLIP. I have addressed your
> > questions and
> > > > > made a corresponding explanation as follows:
> > > > >
> > > > > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to
> > > > update
> > > > > the rules, will it increase the load of JM? For example, if the user
> > > > wants
> > > > > the updated rule to take effect immediately,, which means that we
> > need to
> > > > > set a shorter check interval or there is another scenario when users
> > > > rarely
> > > > > update the pattern, will the PatternProcessoerDiscoverer be in most
> > of
> > > > the
> > > > > time Do useless checks ? Will a lazy update mode could be used,
> > which the
> > > > > pattern only be updated when triggered by the user, and do nothing at
> > > > other
> > > > > times?
> > > > >
> > > > > PatternProcessoerDiscoverer is a user-defined interface to discover
> > the
> > > > > PatternProcessor updates. Periodically checking the PatternProcessor
> > in
> > > > the
> > > > > database is a implementation of the PatternProcessoerDiscoverer
> > > > interface,
> > > > > which is that periodically querys all the PatternProcessor table in
> > > > certain
> > > > > interval. This implementation indeeds has the useless checks, and
> > could
> > > > > directly integrates the changelog of the table. In addition, in
> > addition
> > > > to
> > > > > the implementation of periodically checking the database, there are
> > other
> > > > > implementations such as the PatternProcessor that provides Restful
> > > > services
> > > > > to receive updates.
> > > > >
> > > > > --  I still have some confusion about how Key Generating Opertator
> > and
> > > > > CepOperator (Pattern Matching & Processing Operator) work together.
> > If
> > > > > there are N PatternProcessors, will the Key Generating Opertator
> > > > generate N
> > > > > keyedStreams, and then N CepOperator would process each Key
> > separately ?
> > > > Or
> > > > > every CepOperator Task would process all patterns, if so, does the
> > key
> > > > type
> > > > > in each PatternProcessor need to be the same?
> > > > >
> > > > > Firstly the Pattern Matching & Processing Operator is not the
> > CepOperator
> > > > > at present, because CepOperator mechanism is based on the NFAState.
> > > > > Secondly if there are N PatternProcessors, the Key Generating
> > Opertator
> > > > > combines all the keyedStreams with keyBy() operation, thus the
> > Pattern
> > > > > Matching & Processing Operator would process all the patterns. In
> > other
> > > > > words, the KeySelector of the PatternProcessor is used for the Key
> > > > > Generating Opertator, and the Pattern and PatternProceessFunction of
> > the
> > > > > PatternProcessor are used for the Pattern Matching & Processing
> > Operator.
> > > > > Lastly the key type in each PatternProcessor is the same, regarded as
> > > > > Object type.
> > > > >
> > > > > -- Maybe need to pay attention to it when implementing it .If some
> > > > Pattern
> > > > > has been removed or updated, will the partially matched results in
> > > > > StateBackend would be clean up or We rely on state ttl to clean up
> > these
> > > > > expired states.
> > > > >
> > > > > If certain Pattern has been removed or updated, the partially matched
> > > > > results in StateBackend would be clean up until the next checkpoint.
> > The
> > > > > partially matched result doesn't depend on the state ttl of the
> > > > > StateBackend.
> > > > >
> > > > > 4. Will the PatternProcessorManager keep all the active
> > PatternProcessor
> > > > > in memory? We have also Support Multiple Rule and Dynamic Rule
> > Changing.
> > > > > But we are facing such a problem, some users’ usage scenarios are
> > that
> > > > they
> > > > > want to have their own pattern for each user_id, which means that
> > there
> > > > > could be thousands of patterns, which would make the performance of
> > > > Pattern
> > > > > Matching very poor. We are also trying to solve this problem.
> > > > >
> > > > > The PatternProcessorManager keeps all the active PatternProcessor in
> > > > > memory. For scenarios that they want to have their own pattern for
> > each
> > > > > user_id, IMO, is it possible to reduce the fine-grained pattern of
> > > > > PatternProcessor to solve the performance problem of the Pattern
> > > > Matching,
> > > > > for example, a pattern corresponds to a group of users? The scenarios
> > > > > mentioned above need to be solved by case by case.
> > > > >
> > > > > Best,
> > > > > Nicholas Jiang
> > > > >
> > > > > On 2021/12/17 11:43:10 yue ma wrote:
> > > > > > Glad to see the Community's progress in Flink CEP. After reading
> > this
> > > > > Flip,
> > > > > > I have few questions, would you please take a look  ?
> > > > > >
> > > > > > 1. About Pattern Updating. If we use PatternProcessoerDiscoverer to
> > > > > update
> > > > > > the rules, will it increase the load of JM? For example, if the
> > user
> > > > > wants
> > > > > > the updated rule to take effect immediately,, which means that we
> > need
> > > > to
> > > > > > set a shorter check interval  or there is another scenario when
> > users
> > > > > > rarely update the pattern, will the PatternProcessoerDiscoverer be
> > in
> > > > > most
> > > > > > of the time Do useless checks ? Will a lazy update mode could be
> > used,
> > > > > > which the pattern only be updated when triggered by the user, and
> > do
> > > > > > nothing at other times ?
> > > > > >
> > > > > > 2.   I still have some confusion about how Key Generating
> > Opertator and
> > > > > > CepOperator (Pattern Matching & Processing Operator) work
> > together. If
> > > > > > there are N PatternProcessors, will the Key Generating Opertator
> > > > > generate N
> > > > > > keyedStreams, and then N CepOperator would process each Key
> > separately
> > > > ?
> > > > > Or
> > > > > > every CepOperator Task would process all patterns, if so, does the
> > key
> > > > > type
> > > > > > in each PatternProcessor need to be the same ?
> > > > > >
> > > > > > 3. Maybe need to pay attention to it when implementing it .If some
> > > > > Pattern
> > > > > > has been removed or updateed  ,will the partially matched results
> > in
> > > > > > StateBackend would be clean up or We rely on state ttl to clean up
> > > > these
> > > > > > expired states.
> > > > > >
> > > > > > 4. Will the PatternProcessorManager keep all the active
> > > > PatternProcessor
> > > > > in
> > > > > > memory ? We have also Support Multiple Rule and Dynamic Rule
> > Changing .
> > > > > > But we are facing such a problem, some users’ usage scenarios are
> > that
> > > > > they
> > > > > > want to have their own pattern for each user_id, which means that
> > there
> > > > > > could be thousands of patterns, which would make the performance of
> > > > > Pattern
> > > > > > Matching very poor. We are also trying to solve this problem.
> > > > > >
> > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2021年12月10日周五 19:16写道:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'm opening this thread to propose the design to support multiple
> > > > rule
> > > > > &
> > > > > > > dynamic rule changing in the Flink-CEP project, as described in
> > > > > FLIP-200
> > > > > > > [1]
> > > > > > > .
> > > > > > >
> > > > > > > Currently Flink CEP only supports having a single pattern inside
> > a
> > > > > > > CepOperator and does not support changing the pattern
> > dynamically. In
> > > > > order
> > > > > > > to reduce resource consumption and to experience shorter downtime
> > > > > during
> > > > > > > pattern updates, there is a growing need in the production
> > > > environment
> > > > > that
> > > > > > > expects CEP to support having multiple patterns in one operator
> > and
> > > > to
> > > > > > > support dynamically changing them. Therefore I propose to add
> > certain
> > > > > > > infrastructure as described in FLIP-200 to support these
> > > > > functionalities.
> > > > > > >
> > > > > > > Please feel free to reply to this email thread. Looking forward
> > to
> > > > your
> > > > > > > feedback!
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Yunfeng
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Konstantin Knauf
> > > >
> > > > https://twitter.com/snntrable
> > > >
> > > > https://github.com/knaufk
> > > >
> > >
> >
> 

Reply via email to