Hi David, Thanks for your feedback of the FLIP. I addressed the comments above and share the thoughts about the question mentioned:
*About how this will work with the OperatorCoordinator for re-processing of the historical data using the OperatorCoordinator?* OperatorCoordinator will checkpoint the full amount of PatternProcessor data. For the reprocessing of historical data, you can read the PatternProcessor snapshots saved by this checkpoint from a certain historical checkpoint, and then recreate the historical data through these PatternProcessor snapshots. About the side-input (custom source / operator + broadcast), Becket has given the explanation for the OperatorCoordinator V.S. side-input / broadcast stream. You could share your thoughts about this. Best, Nicholas Jiang On 2021/12/21 08:24:53 David Morávek wrote: > Hi Yunfeng, > > thanks for drafting this FLIP, this will be a great addition into the CEP > toolbox! > > Apart from running user code in JM, which want to avoid in general, I'd > have one more another concern about using the OperatorCoordinator and that > is re-processing of the historical data. Any thoughts about how this will > work with the OC? > > I have a slight feeling that a side-input (custom source / operator + > broadcast) would a better fit for this case. This would simplify the > consistency concerns (watermarks + pushback) and the re-processing of > historical data. > > Best, > D. > > > On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <nicholasji...@apache.org> > wrote: > > > Hi Konstantin, Martijn > > > > Thanks for the detailed feedback in the discussion. What I still have left > > to answer/reply to: > > > > -- Martijn: Just to be sure, this indeed would mean that if for whatever > > reason the heartbeat timeout, it would crash the job, right? > > > > IMO, if for whatever reason the heartbeat timeout, it couldn't check the > > PatternProcessor consistency between the OperatorCoordinator and the > > subtasks so that the job would be crashed. > > > > -- Konstantin: What I was concerned about is that we basically let users > > run a UserFunction in the OperatorCoordinator, which it does not seem to > > have been designed for. > > > > In general, we have reached an agreement on the design of this FLIP, but > > there are some concerns on the OperatorCoordinator, about whether basically > > let users run a UserFunction in the OperatorCoordinator is designed for > > OperatorCoordinator. We would like to invite Becket Qin who is the author > > of OperatorCoordinator to help us to answer this concern. > > > > Best, > > Nicholas Jiang > > > > > > On 2021/12/20 10:07:14 Martijn Visser wrote: > > > Hi all, > > > > > > Really like the discussion on this topic moving forward. I really think > > > this feature will be much appreciated by the Flink users. What I still > > have > > > left to answer/reply to: > > > > > > -- Good point. If for whatever reason the different taskmanagers can't > > get > > > the latest rule, the Operator Coordinator could send a heartbeat to all > > > taskmanagers with the latest rules and check the heartbeat response from > > > all the taskmanagers whether the latest rules of the taskmanager is equal > > > to these of the Operator Coordinator. > > > > > > Just to be sure, this indeed would mean that if for whatever reason the > > > heartbeat timeout, it would crash the job, right? > > > > > > -- We have consided about the solution mentioned above. In this > > solution, I > > > have some questions about how to guarantee the consistency of the rule > > > between each TaskManager. By having a coodinator in the JobManager to > > > centrally manage the latest rules, the latest rules of all TaskManagers > > are > > > consistent with those of the JobManager, so as to avoid the > > inconsistencies > > > that may be encountered in the above solution. Can you introduce how this > > > solution guarantees the consistency of the rules? > > > > > > The consistency that we could guarantee was based on how often each > > > TaskManager would do a refresh and how often we would accept a refresh to > > > fail. We set the refresh time to a relatively short one (30 seconds) and > > > maximum failures to 3. That meant that we could guarantee that rules > > would > > > be updated in < 2 minutes or else the job would crash. That was > > sufficient > > > for our use cases. This also really depends on how big your cluster is. I > > > can imagine that if you have a large scale cluster that you want to run, > > > you don't want to DDOS the backend system where you have your rules > > stored. > > > > > > -- In summary, the current design is that JobManager tells all > > TaskManagers > > > the latest rules through OperatorCoodinator, and will initiate a > > heartbeat > > > to check whether the latest rules on each TaskManager are consistent. We > > > will describe how to deal with the Failover scenario in more detail on > > FLIP. > > > > > > Thanks for that. I think having the JobManager tell the TaskManagers the > > > applicable rules would indeed end up being the best design. > > > > > > -- about the concerns around consistency raised by Martijn: I think a lot > > > of those can be mitigated by using an event time timestamp from which the > > > rules take effect. The reprocessing scenario, for example, is covered by > > > this. If a pattern processor should become active as soon as possible, > > > there will still be inconsistencies between Taskmanagers, but "as soon as > > > possible" is vague anyway, which is why I think that's ok. > > > > > > I think an event timestamp is indeed a really important one. We also used > > > that in my previous role, with the ruleActivationTimestamp compared to > > > eventtime (well, actually we used Kafka logAppend time because > > > eventtime wasn't always properly set so we used that time to overwrite > > the > > > eventtime from the event itself). > > > > > > Best regards, > > > > > > Martijn > > > > > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <kna...@apache.org> > > wrote: > > > > > > > Hi Nicholas, Hi Junfeng, > > > > > > > > about the concerns around consistency raised by Martijn: I think a lot > > of > > > > those can be mitigated by using an event time timestamp from which the > > > > rules take effect. The reprocessing scenario, for example, is covered > > by > > > > this. If a pattern processor should become active as soon as possible, > > > > there will still be inconsistencies between Taskmanagers, but "as soon > > as > > > > possible" is vague anyway, which is why I think that's ok. > > > > > > > > about naming: The naming with "PatternProcessor" sounds good to me. > > Final > > > > nit: I would go for CEP#patternProccessors, which would be consistent > > with > > > > CEP#pattern. > > > > > > > > I am not sure about one of the rejected alternatives: > > > > > > > > > Have each subtask of an operator make the update on their own > > > > > > > > - > > > > > > > > It is hard to achieve consistency. > > > > - > > > > > > > > Though the time interval that each subtask makes the update can > > be > > > > the same, the absolute time they make the update might be > > different. > > > > For > > > > example, one makes updates at 10:00, 10:05, etc, while another > > does > > > > it at > > > > 10:01, 10:06. In this case the subtasks might never processing > > data > > > > with > > > > the same set of pattern processors. > > > > > > > > > > > > I would have thought that it is quite easy to poll for the rules from > > each > > > > Subtask at *about *the same time. So, this alone does not seem to be > > > > enough to rule out this option. I've looped in David Moravek to get his > > > > opinion of the additional load imposed on the JM. > > > > > > > > Thanks, > > > > > > > > Konstantin > > > > > > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang < > > nicholasji...@apache.org> > > > > wrote: > > > > > > > > > Hi Yue, > > > > > > > > > > Thanks for your feedback of the FLIP. I have addressed your > > questions and > > > > > made a corresponding explanation as follows: > > > > > > > > > > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to > > > > update > > > > > the rules, will it increase the load of JM? For example, if the user > > > > wants > > > > > the updated rule to take effect immediately,, which means that we > > need to > > > > > set a shorter check interval or there is another scenario when users > > > > rarely > > > > > update the pattern, will the PatternProcessoerDiscoverer be in most > > of > > > > the > > > > > time Do useless checks ? Will a lazy update mode could be used, > > which the > > > > > pattern only be updated when triggered by the user, and do nothing at > > > > other > > > > > times? > > > > > > > > > > PatternProcessoerDiscoverer is a user-defined interface to discover > > the > > > > > PatternProcessor updates. Periodically checking the PatternProcessor > > in > > > > the > > > > > database is a implementation of the PatternProcessoerDiscoverer > > > > interface, > > > > > which is that periodically querys all the PatternProcessor table in > > > > certain > > > > > interval. This implementation indeeds has the useless checks, and > > could > > > > > directly integrates the changelog of the table. In addition, in > > addition > > > > to > > > > > the implementation of periodically checking the database, there are > > other > > > > > implementations such as the PatternProcessor that provides Restful > > > > services > > > > > to receive updates. > > > > > > > > > > -- I still have some confusion about how Key Generating Opertator > > and > > > > > CepOperator (Pattern Matching & Processing Operator) work together. > > If > > > > > there are N PatternProcessors, will the Key Generating Opertator > > > > generate N > > > > > keyedStreams, and then N CepOperator would process each Key > > separately ? > > > > Or > > > > > every CepOperator Task would process all patterns, if so, does the > > key > > > > type > > > > > in each PatternProcessor need to be the same? > > > > > > > > > > Firstly the Pattern Matching & Processing Operator is not the > > CepOperator > > > > > at present, because CepOperator mechanism is based on the NFAState. > > > > > Secondly if there are N PatternProcessors, the Key Generating > > Opertator > > > > > combines all the keyedStreams with keyBy() operation, thus the > > Pattern > > > > > Matching & Processing Operator would process all the patterns. In > > other > > > > > words, the KeySelector of the PatternProcessor is used for the Key > > > > > Generating Opertator, and the Pattern and PatternProceessFunction of > > the > > > > > PatternProcessor are used for the Pattern Matching & Processing > > Operator. > > > > > Lastly the key type in each PatternProcessor is the same, regarded as > > > > > Object type. > > > > > > > > > > -- Maybe need to pay attention to it when implementing it .If some > > > > Pattern > > > > > has been removed or updated, will the partially matched results in > > > > > StateBackend would be clean up or We rely on state ttl to clean up > > these > > > > > expired states. > > > > > > > > > > If certain Pattern has been removed or updated, the partially matched > > > > > results in StateBackend would be clean up until the next checkpoint. > > The > > > > > partially matched result doesn't depend on the state ttl of the > > > > > StateBackend. > > > > > > > > > > 4. Will the PatternProcessorManager keep all the active > > PatternProcessor > > > > > in memory? We have also Support Multiple Rule and Dynamic Rule > > Changing. > > > > > But we are facing such a problem, some users’ usage scenarios are > > that > > > > they > > > > > want to have their own pattern for each user_id, which means that > > there > > > > > could be thousands of patterns, which would make the performance of > > > > Pattern > > > > > Matching very poor. We are also trying to solve this problem. > > > > > > > > > > The PatternProcessorManager keeps all the active PatternProcessor in > > > > > memory. For scenarios that they want to have their own pattern for > > each > > > > > user_id, IMO, is it possible to reduce the fine-grained pattern of > > > > > PatternProcessor to solve the performance problem of the Pattern > > > > Matching, > > > > > for example, a pattern corresponds to a group of users? The scenarios > > > > > mentioned above need to be solved by case by case. > > > > > > > > > > Best, > > > > > Nicholas Jiang > > > > > > > > > > On 2021/12/17 11:43:10 yue ma wrote: > > > > > > Glad to see the Community's progress in Flink CEP. After reading > > this > > > > > Flip, > > > > > > I have few questions, would you please take a look ? > > > > > > > > > > > > 1. About Pattern Updating. If we use PatternProcessoerDiscoverer to > > > > > update > > > > > > the rules, will it increase the load of JM? For example, if the > > user > > > > > wants > > > > > > the updated rule to take effect immediately,, which means that we > > need > > > > to > > > > > > set a shorter check interval or there is another scenario when > > users > > > > > > rarely update the pattern, will the PatternProcessoerDiscoverer be > > in > > > > > most > > > > > > of the time Do useless checks ? Will a lazy update mode could be > > used, > > > > > > which the pattern only be updated when triggered by the user, and > > do > > > > > > nothing at other times ? > > > > > > > > > > > > 2. I still have some confusion about how Key Generating > > Opertator and > > > > > > CepOperator (Pattern Matching & Processing Operator) work > > together. If > > > > > > there are N PatternProcessors, will the Key Generating Opertator > > > > > generate N > > > > > > keyedStreams, and then N CepOperator would process each Key > > separately > > > > ? > > > > > Or > > > > > > every CepOperator Task would process all patterns, if so, does the > > key > > > > > type > > > > > > in each PatternProcessor need to be the same ? > > > > > > > > > > > > 3. Maybe need to pay attention to it when implementing it .If some > > > > > Pattern > > > > > > has been removed or updateed ,will the partially matched results > > in > > > > > > StateBackend would be clean up or We rely on state ttl to clean up > > > > these > > > > > > expired states. > > > > > > > > > > > > 4. Will the PatternProcessorManager keep all the active > > > > PatternProcessor > > > > > in > > > > > > memory ? We have also Support Multiple Rule and Dynamic Rule > > Changing . > > > > > > But we are facing such a problem, some users’ usage scenarios are > > that > > > > > they > > > > > > want to have their own pattern for each user_id, which means that > > there > > > > > > could be thousands of patterns, which would make the performance of > > > > > Pattern > > > > > > Matching very poor. We are also trying to solve this problem. > > > > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2021年12月10日周五 19:16写道: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I'm opening this thread to propose the design to support multiple > > > > rule > > > > > & > > > > > > > dynamic rule changing in the Flink-CEP project, as described in > > > > > FLIP-200 > > > > > > > [1] > > > > > > > . > > > > > > > > > > > > > > Currently Flink CEP only supports having a single pattern inside > > a > > > > > > > CepOperator and does not support changing the pattern > > dynamically. In > > > > > order > > > > > > > to reduce resource consumption and to experience shorter downtime > > > > > during > > > > > > > pattern updates, there is a growing need in the production > > > > environment > > > > > that > > > > > > > expects CEP to support having multiple patterns in one operator > > and > > > > to > > > > > > > support dynamically changing them. Therefore I propose to add > > certain > > > > > > > infrastructure as described in FLIP-200 to support these > > > > > functionalities. > > > > > > > > > > > > > > Please feel free to reply to this email thread. Looking forward > > to > > > > your > > > > > > > feedback! > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308 > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Yunfeng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Konstantin Knauf > > > > > > > > https://twitter.com/snntrable > > > > > > > > https://github.com/knaufk > > > > > > > > > >