Hi Konstantin, Becket, Martijn, Thanks for sharing your feedback. What other concerns do you have about OperatorCoodinator? If an agreement is reached on OperatorCoodinator, I will start the voting thread.
Best, Nicholas Jiang On 2021/12/22 03:19:58 Becket Qin wrote: > Hi Konstantin, > > Thanks for sharing your thoughts. Please see the reply inline below. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf <kna...@apache.org> wrote: > > > Hi Becket, Hi Nicholas, > > > > Thanks for joining the discussion. > > > > 1 ) Personally, I would argue that we should only run user code in the > > Jobmanager/Jobmaster if we can not avoid it. It seems wrong to me to > > encourage users to e.g. run a webserver on the Jobmanager, or continuously > > read patterns from a Kafka Topic on the Jobmanager, but both of these I see > > happening with the current design. We've had lots of issues with > > classloading leaks and other stability issues on the Jobmanager and making > > this more complicated, if there is another way, seems unnecessary. > > > I think the key question here is what primitive does Flink provide to > facilitate the user implementation of their own control logic / control > plane? It looks that previously, Flink assumes that all the user logic is > just data processing logic without any control / coordination requirements. > However, it turns out that a decent control plane abstraction is required > in association with the data processing logic in many cases, including > Source / Sink and other user defined operators in general. The fact that we > ended up with adding the SplitEnumerator and GlobalCommitter are just two > examples of the demand of such coordination among user defined logics. > There are other cases that we see in ecosystem projects, such as > deep-learning-on-flink[1]. Now we see this again in CEP. > > Such control plane primitives are critical to the extensibility of a > project. If we look at other projects, exposing such control plane logic is > quite common. For example, Hadoop ended up with exposing YARN as a public > API to the users, which is extremely popular. Kafka consumers exposed the > consumer group rebalance logic to the users via ConsumerPartitionAssigner, > which is also a control plane primitive. > > To me it is more important to think about how we can improve the stability > of such a control plane mechanism, instead of simply saying no to the users. > > > > > > 2) In addition, I suspect that, over time we will have to implement all the > > functionality that regular sources already provide around consistency > > (watermarks, checkpointing) for the PatternProcessorCoordinator, too. > > > I think OperatorCoordinator should have a generic communication mechanism > for all the operators, not specific to Source. We should probably have an > AbstractOperatorCoordinator help dealing with the communication layer, and > leave the state maintenance and event handling logic to the user code. > > > > 3) I understand that running on the Jobmanager is easier if you want to > > launch a REST server directly. Here my question would be: does this really > > need to be solved inside of Flink or couldn't you start a webserver next to > > Flink? If we start using the Jobmanager as a REST server users will expect > > that e.g. it is highly available and can be load balanced and we quickly > > need to think about aspects that we never wanted to think about in the > > context of a Flink Jobmanager. > > > > I think the REST API is just for receiving commands targeting a running > Flink job. If the job fails, the REST API would be useless. > > > > So, can you elaborate a bit more, why a side-input/broadcast stream is > > > > a) more difficult > > b) has vague semantics (To me semantics of a stream-stream seem clearer > > when it comes to out-of-orderness, late data, reprocessing or batch > > execution mode.) > > > I do agree that having the user defined control logic defined in the JM > increases the chance of instability. In that case, we may think of other > solutions and I am fully open to that. But the side-input / broadcast > stream seems more like a bandaid instead of a carefully designed control > plane mechanism. > > A decent control plane requires two-way communication, so information can > be reported / collected from the entity being controlled, and the > coordinator / controller can send decisions or commands to the entities > accordingly, just like our TM / JM communication. IIUC, this is not > achievable with the existing side-input / broadcast stream as both of them > are one-way communication mechanisms. For instance, the example I gave in > my previous email seems not easily achievable with side-input / broadcast > streams: a single invalid pattern detected on a TM can be disabled > elegantly globally without crashing the entire Flink job. > > > > Cheers, > > > > Konstantin > > > > > > On Tue, Dec 21, 2021 at 11:38 AM Becket Qin <becket....@gmail.com> wrote: > > > > > Hi folks, > > > > > > I just finished reading the previous emails. It was a good discussion. > > Here > > > are my two cents: > > > > > > *Is OperatorCoordinator a public API?* > > > Regarding the OperatorCoordinator, although it is marked as internal > > > interface at this point, I intended to make it a public interface when > > add > > > that in FLIP-27. This is a powerful cross-subtask communication mechanism > > > that enables many use cases, Source / Sink / TF on Flink / CEP here > > again. > > > To my understanding, OC was marked as internal because we think it is not > > > stable enough yet. We may need to fortify the OperatorEvent delivery > > > semantic a little bit so it works well with checkpoint in general. > > > > > > I think it is a valid concern that user code running in JM may cause > > > instability. However, not providing this communication mechanism only > > makes > > > a lot of use case even harder to implement. So it seems that having the > > OC > > > exposed to end users brings more benefits than harm to Flink. At the end > > of > > > the day, users have many ways to crash a Flink job if they do not write > > > proper code. So making those who know what they do happy seems more > > > important here. > > > > > > *OperatorCoordinator V.S. side-input / broadcast stream* > > > I think both of them can achieve the goal of dynamic patterns. The main > > > difference is the extensibility. > > > > > > OC is a 2-way communication mechanism, i.e. a subtask can also send > > > OperatorEvent to the coordinator to report its owns status, so that the > > > coordinator can act accordingly. This is sometimes useful. For example, a > > > single invalid pattern can be disabled elegantly without crashing the > > > entire Flink job. In the future, if we allow users to send external > > command > > > to OC via JM, a default self-contained implementation can just update the > > > pattern via the REST API without external dependencies. > > > > > > Another reason I personally prefer OC is because it is an explicit > > control > > > plain mechanism, where as the side-input / broadcast stream has are more > > > vague semantic. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Dec 21, 2021 at 4:25 PM David Morávek <d...@apache.org> wrote: > > > > > > > Hi Yunfeng, > > > > > > > > thanks for drafting this FLIP, this will be a great addition into the > > CEP > > > > toolbox! > > > > > > > > Apart from running user code in JM, which want to avoid in general, I'd > > > > have one more another concern about using the OperatorCoordinator and > > > that > > > > is re-processing of the historical data. Any thoughts about how this > > will > > > > work with the OC? > > > > > > > > I have a slight feeling that a side-input (custom source / operator + > > > > broadcast) would a better fit for this case. This would simplify the > > > > consistency concerns (watermarks + pushback) and the re-processing of > > > > historical data. > > > > > > > > Best, > > > > D. > > > > > > > > > > > > On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang < > > nicholasji...@apache.org > > > > > > > > wrote: > > > > > > > > > Hi Konstantin, Martijn > > > > > > > > > > Thanks for the detailed feedback in the discussion. What I still have > > > > left > > > > > to answer/reply to: > > > > > > > > > > -- Martijn: Just to be sure, this indeed would mean that if for > > > whatever > > > > > reason the heartbeat timeout, it would crash the job, right? > > > > > > > > > > IMO, if for whatever reason the heartbeat timeout, it couldn't check > > > the > > > > > PatternProcessor consistency between the OperatorCoordinator and the > > > > > subtasks so that the job would be crashed. > > > > > > > > > > -- Konstantin: What I was concerned about is that we basically let > > > users > > > > > run a UserFunction in the OperatorCoordinator, which it does not seem > > > to > > > > > have been designed for. > > > > > > > > > > In general, we have reached an agreement on the design of this FLIP, > > > but > > > > > there are some concerns on the OperatorCoordinator, about whether > > > > basically > > > > > let users run a UserFunction in the OperatorCoordinator is designed > > for > > > > > OperatorCoordinator. We would like to invite Becket Qin who is the > > > author > > > > > of OperatorCoordinator to help us to answer this concern. > > > > > > > > > > Best, > > > > > Nicholas Jiang > > > > > > > > > > > > > > > On 2021/12/20 10:07:14 Martijn Visser wrote: > > > > > > Hi all, > > > > > > > > > > > > Really like the discussion on this topic moving forward. I really > > > think > > > > > > this feature will be much appreciated by the Flink users. What I > > > still > > > > > have > > > > > > left to answer/reply to: > > > > > > > > > > > > -- Good point. If for whatever reason the different taskmanagers > > > can't > > > > > get > > > > > > the latest rule, the Operator Coordinator could send a heartbeat to > > > all > > > > > > taskmanagers with the latest rules and check the heartbeat response > > > > from > > > > > > all the taskmanagers whether the latest rules of the taskmanager is > > > > equal > > > > > > to these of the Operator Coordinator. > > > > > > > > > > > > Just to be sure, this indeed would mean that if for whatever reason > > > the > > > > > > heartbeat timeout, it would crash the job, right? > > > > > > > > > > > > -- We have consided about the solution mentioned above. In this > > > > > solution, I > > > > > > have some questions about how to guarantee the consistency of the > > > rule > > > > > > between each TaskManager. By having a coodinator in the JobManager > > to > > > > > > centrally manage the latest rules, the latest rules of all > > > TaskManagers > > > > > are > > > > > > consistent with those of the JobManager, so as to avoid the > > > > > inconsistencies > > > > > > that may be encountered in the above solution. Can you introduce > > how > > > > this > > > > > > solution guarantees the consistency of the rules? > > > > > > > > > > > > The consistency that we could guarantee was based on how often each > > > > > > TaskManager would do a refresh and how often we would accept a > > > refresh > > > > to > > > > > > fail. We set the refresh time to a relatively short one (30 > > seconds) > > > > and > > > > > > maximum failures to 3. That meant that we could guarantee that > > rules > > > > > would > > > > > > be updated in < 2 minutes or else the job would crash. That was > > > > > sufficient > > > > > > for our use cases. This also really depends on how big your cluster > > > > is. I > > > > > > can imagine that if you have a large scale cluster that you want to > > > > run, > > > > > > you don't want to DDOS the backend system where you have your rules > > > > > stored. > > > > > > > > > > > > -- In summary, the current design is that JobManager tells all > > > > > TaskManagers > > > > > > the latest rules through OperatorCoodinator, and will initiate a > > > > > heartbeat > > > > > > to check whether the latest rules on each TaskManager are > > consistent. > > > > We > > > > > > will describe how to deal with the Failover scenario in more detail > > > on > > > > > FLIP. > > > > > > > > > > > > Thanks for that. I think having the JobManager tell the > > TaskManagers > > > > the > > > > > > applicable rules would indeed end up being the best design. > > > > > > > > > > > > -- about the concerns around consistency raised by Martijn: I > > think a > > > > lot > > > > > > of those can be mitigated by using an event time timestamp from > > which > > > > the > > > > > > rules take effect. The reprocessing scenario, for example, is > > covered > > > > by > > > > > > this. If a pattern processor should become active as soon as > > > possible, > > > > > > there will still be inconsistencies between Taskmanagers, but "as > > > soon > > > > as > > > > > > possible" is vague anyway, which is why I think that's ok. > > > > > > > > > > > > I think an event timestamp is indeed a really important one. We > > also > > > > used > > > > > > that in my previous role, with the ruleActivationTimestamp compared > > > to > > > > > > eventtime (well, actually we used Kafka logAppend time because > > > > > > eventtime wasn't always properly set so we used that time to > > > overwrite > > > > > the > > > > > > eventtime from the event itself). > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Martijn > > > > > > > > > > > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <kna...@apache.org> > > > > > wrote: > > > > > > > > > > > > > Hi Nicholas, Hi Junfeng, > > > > > > > > > > > > > > about the concerns around consistency raised by Martijn: I think > > a > > > > lot > > > > > of > > > > > > > those can be mitigated by using an event time timestamp from > > which > > > > the > > > > > > > rules take effect. The reprocessing scenario, for example, is > > > covered > > > > > by > > > > > > > this. If a pattern processor should become active as soon as > > > > possible, > > > > > > > there will still be inconsistencies between Taskmanagers, but "as > > > > soon > > > > > as > > > > > > > possible" is vague anyway, which is why I think that's ok. > > > > > > > > > > > > > > about naming: The naming with "PatternProcessor" sounds good to > > me. > > > > > Final > > > > > > > nit: I would go for CEP#patternProccessors, which would be > > > consistent > > > > > with > > > > > > > CEP#pattern. > > > > > > > > > > > > > > I am not sure about one of the rejected alternatives: > > > > > > > > > > > > > > > Have each subtask of an operator make the update on their own > > > > > > > > > > > > > > - > > > > > > > > > > > > > > It is hard to achieve consistency. > > > > > > > - > > > > > > > > > > > > > > Though the time interval that each subtask makes the update > > > can > > > > > be > > > > > > > the same, the absolute time they make the update might be > > > > > different. > > > > > > > For > > > > > > > example, one makes updates at 10:00, 10:05, etc, while > > > another > > > > > does > > > > > > > it at > > > > > > > 10:01, 10:06. In this case the subtasks might never > > > processing > > > > > data > > > > > > > with > > > > > > > the same set of pattern processors. > > > > > > > > > > > > > > > > > > > > > I would have thought that it is quite easy to poll for the rules > > > from > > > > > each > > > > > > > Subtask at *about *the same time. So, this alone does not seem to > > > be > > > > > > > enough to rule out this option. I've looped in David Moravek to > > get > > > > his > > > > > > > opinion of the additional load imposed on the JM. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Konstantin > > > > > > > > > > > > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang < > > > > > nicholasji...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Yue, > > > > > > > > > > > > > > > > Thanks for your feedback of the FLIP. I have addressed your > > > > > questions and > > > > > > > > made a corresponding explanation as follows: > > > > > > > > > > > > > > > > -- About Pattern Updating. If we use > > PatternProcessoerDiscoverer > > > to > > > > > > > update > > > > > > > > the rules, will it increase the load of JM? For example, if the > > > > user > > > > > > > wants > > > > > > > > the updated rule to take effect immediately,, which means that > > we > > > > > need to > > > > > > > > set a shorter check interval or there is another scenario when > > > > users > > > > > > > rarely > > > > > > > > update the pattern, will the PatternProcessoerDiscoverer be in > > > most > > > > > of > > > > > > > the > > > > > > > > time Do useless checks ? Will a lazy update mode could be used, > > > > > which the > > > > > > > > pattern only be updated when triggered by the user, and do > > > nothing > > > > at > > > > > > > other > > > > > > > > times? > > > > > > > > > > > > > > > > PatternProcessoerDiscoverer is a user-defined interface to > > > discover > > > > > the > > > > > > > > PatternProcessor updates. Periodically checking the > > > > PatternProcessor > > > > > in > > > > > > > the > > > > > > > > database is a implementation of the PatternProcessoerDiscoverer > > > > > > > interface, > > > > > > > > which is that periodically querys all the PatternProcessor > > table > > > in > > > > > > > certain > > > > > > > > interval. This implementation indeeds has the useless checks, > > and > > > > > could > > > > > > > > directly integrates the changelog of the table. In addition, in > > > > > addition > > > > > > > to > > > > > > > > the implementation of periodically checking the database, there > > > are > > > > > other > > > > > > > > implementations such as the PatternProcessor that provides > > > Restful > > > > > > > services > > > > > > > > to receive updates. > > > > > > > > > > > > > > > > -- I still have some confusion about how Key Generating > > > Opertator > > > > > and > > > > > > > > CepOperator (Pattern Matching & Processing Operator) work > > > together. > > > > > If > > > > > > > > there are N PatternProcessors, will the Key Generating > > Opertator > > > > > > > generate N > > > > > > > > keyedStreams, and then N CepOperator would process each Key > > > > > separately ? > > > > > > > Or > > > > > > > > every CepOperator Task would process all patterns, if so, does > > > the > > > > > key > > > > > > > type > > > > > > > > in each PatternProcessor need to be the same? > > > > > > > > > > > > > > > > Firstly the Pattern Matching & Processing Operator is not the > > > > > CepOperator > > > > > > > > at present, because CepOperator mechanism is based on the > > > NFAState. > > > > > > > > Secondly if there are N PatternProcessors, the Key Generating > > > > > Opertator > > > > > > > > combines all the keyedStreams with keyBy() operation, thus the > > > > > Pattern > > > > > > > > Matching & Processing Operator would process all the patterns. > > In > > > > > other > > > > > > > > words, the KeySelector of the PatternProcessor is used for the > > > Key > > > > > > > > Generating Opertator, and the Pattern and > > PatternProceessFunction > > > > of > > > > > the > > > > > > > > PatternProcessor are used for the Pattern Matching & Processing > > > > > Operator. > > > > > > > > Lastly the key type in each PatternProcessor is the same, > > > regarded > > > > as > > > > > > > > Object type. > > > > > > > > > > > > > > > > -- Maybe need to pay attention to it when implementing it .If > > > some > > > > > > > Pattern > > > > > > > > has been removed or updated, will the partially matched results > > > in > > > > > > > > StateBackend would be clean up or We rely on state ttl to clean > > > up > > > > > these > > > > > > > > expired states. > > > > > > > > > > > > > > > > If certain Pattern has been removed or updated, the partially > > > > matched > > > > > > > > results in StateBackend would be clean up until the next > > > > checkpoint. > > > > > The > > > > > > > > partially matched result doesn't depend on the state ttl of the > > > > > > > > StateBackend. > > > > > > > > > > > > > > > > 4. Will the PatternProcessorManager keep all the active > > > > > PatternProcessor > > > > > > > > in memory? We have also Support Multiple Rule and Dynamic Rule > > > > > Changing. > > > > > > > > But we are facing such a problem, some users’ usage scenarios > > are > > > > > that > > > > > > > they > > > > > > > > want to have their own pattern for each user_id, which means > > that > > > > > there > > > > > > > > could be thousands of patterns, which would make the > > performance > > > of > > > > > > > Pattern > > > > > > > > Matching very poor. We are also trying to solve this problem. > > > > > > > > > > > > > > > > The PatternProcessorManager keeps all the active > > PatternProcessor > > > > in > > > > > > > > memory. For scenarios that they want to have their own pattern > > > for > > > > > each > > > > > > > > user_id, IMO, is it possible to reduce the fine-grained pattern > > > of > > > > > > > > PatternProcessor to solve the performance problem of the > > Pattern > > > > > > > Matching, > > > > > > > > for example, a pattern corresponds to a group of users? The > > > > scenarios > > > > > > > > mentioned above need to be solved by case by case. > > > > > > > > > > > > > > > > Best, > > > > > > > > Nicholas Jiang > > > > > > > > > > > > > > > > On 2021/12/17 11:43:10 yue ma wrote: > > > > > > > > > Glad to see the Community's progress in Flink CEP. After > > > reading > > > > > this > > > > > > > > Flip, > > > > > > > > > I have few questions, would you please take a look ? > > > > > > > > > > > > > > > > > > 1. About Pattern Updating. If we use > > > PatternProcessoerDiscoverer > > > > to > > > > > > > > update > > > > > > > > > the rules, will it increase the load of JM? For example, if > > the > > > > > user > > > > > > > > wants > > > > > > > > > the updated rule to take effect immediately,, which means > > that > > > we > > > > > need > > > > > > > to > > > > > > > > > set a shorter check interval or there is another scenario > > when > > > > > users > > > > > > > > > rarely update the pattern, will the > > PatternProcessoerDiscoverer > > > > be > > > > > in > > > > > > > > most > > > > > > > > > of the time Do useless checks ? Will a lazy update mode could > > > be > > > > > used, > > > > > > > > > which the pattern only be updated when triggered by the user, > > > and > > > > > do > > > > > > > > > nothing at other times ? > > > > > > > > > > > > > > > > > > 2. I still have some confusion about how Key Generating > > > > > Opertator and > > > > > > > > > CepOperator (Pattern Matching & Processing Operator) work > > > > > together. If > > > > > > > > > there are N PatternProcessors, will the Key Generating > > > Opertator > > > > > > > > generate N > > > > > > > > > keyedStreams, and then N CepOperator would process each Key > > > > > separately > > > > > > > ? > > > > > > > > Or > > > > > > > > > every CepOperator Task would process all patterns, if so, > > does > > > > the > > > > > key > > > > > > > > type > > > > > > > > > in each PatternProcessor need to be the same ? > > > > > > > > > > > > > > > > > > 3. Maybe need to pay attention to it when implementing it .If > > > > some > > > > > > > > Pattern > > > > > > > > > has been removed or updateed ,will the partially matched > > > results > > > > > in > > > > > > > > > StateBackend would be clean up or We rely on state ttl to > > clean > > > > up > > > > > > > these > > > > > > > > > expired states. > > > > > > > > > > > > > > > > > > 4. Will the PatternProcessorManager keep all the active > > > > > > > PatternProcessor > > > > > > > > in > > > > > > > > > memory ? We have also Support Multiple Rule and Dynamic Rule > > > > > Changing . > > > > > > > > > But we are facing such a problem, some users’ usage scenarios > > > are > > > > > that > > > > > > > > they > > > > > > > > > want to have their own pattern for each user_id, which means > > > that > > > > > there > > > > > > > > > could be thousands of patterns, which would make the > > > performance > > > > of > > > > > > > > Pattern > > > > > > > > > Matching very poor. We are also trying to solve this problem. > > > > > > > > > > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2021年12月10日周五 > > > > 19:16写道: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > I'm opening this thread to propose the design to support > > > > multiple > > > > > > > rule > > > > > > > > & > > > > > > > > > > dynamic rule changing in the Flink-CEP project, as > > described > > > in > > > > > > > > FLIP-200 > > > > > > > > > > [1] > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > Currently Flink CEP only supports having a single pattern > > > > inside > > > > > a > > > > > > > > > > CepOperator and does not support changing the pattern > > > > > dynamically. In > > > > > > > > order > > > > > > > > > > to reduce resource consumption and to experience shorter > > > > downtime > > > > > > > > during > > > > > > > > > > pattern updates, there is a growing need in the production > > > > > > > environment > > > > > > > > that > > > > > > > > > > expects CEP to support having multiple patterns in one > > > operator > > > > > and > > > > > > > to > > > > > > > > > > support dynamically changing them. Therefore I propose to > > add > > > > > certain > > > > > > > > > > infrastructure as described in FLIP-200 to support these > > > > > > > > functionalities. > > > > > > > > > > > > > > > > > > > > Please feel free to reply to this email thread. Looking > > > forward > > > > > to > > > > > > > your > > > > > > > > > > feedback! > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308 > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > > > Yunfeng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Konstantin Knauf > > > > > > > > > > > > > > https://twitter.com/snntrable > > > > > > > > > > > > > > https://github.com/knaufk > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Konstantin Knauf > > > > https://twitter.com/snntrable > > > > https://github.com/knaufk > > >