Hi Becket,

I might be missing something but having to define interfaces/formats for
the CEP patterns should be necessary for either approach. The OC approach
needs to receive and understand the pattern data from somewhere as well and
will probably also have to deal with evolving formats. Hence, I believe
that this work wouldn't be wasted.

I might misjudge the willingness of our users to do some extra set up work,
but I think that some of them would already be happy with state 1.

My understanding so far was that the OC approach also requires the CEP
operator infrastructure (making the operator accept new patterns) work that
I proposed to do as a first step. The only difference is where the new
patterns/commands are coming from. If we design this correctly, then this
should change only very little depending on whether you read from a
side-input or from an OC.

Another benefit of downscoping the FLIP is to make it more realistic to be
completed. Smaller and incremental steps are usually easier to realize. If
we now say that this FLIP requires a general purpose user controlled
control plane that gives you hard guarantees, then I am pretty sure that
this will take at least half a year.

Cheers,
Till

On Thu, Jan 6, 2022 at 4:45 AM Becket Qin <becket....@gmail.com> wrote:

> Thanks for the explanation, Till. I like the idea, but have a question
> about the first step.
>
> After the first step, would users be able to actually use the dynamic
> patterns in CEP?
>
> In the first step you mentioned, the commands and formats for a new CEP
> pattern seem related to how users would ingest the commands. If we go with
> the OC, these commands and formats would become internal interfaces. The
> end users would just use the REST API, or in the beginning, implement a
> Java plugin of dynamic pattern provider. In this case our focus would be on
> designing a good plugin interface. On the other hand, if we go with the
> side-input, users would need to know the command format so they can send
> the commands to the CEP operator. Therefore we need to think about stuff
> like versioning, request validation and backwards compatibility.
>
> Also, because the public interface is all about how the users can ingest
> the dynamic patterns. It seems we still need to figure that out before we
> can conclude the FLIP.
>
> Assuming the first step closes this FLIP and after that users would be able
> to use the CEP dynamic pattern, are you suggesting the following?
>
> 1. We will design the commands and format of CEP dynamic pattern, and also
> how CEP operators take them into effect. This design would assume that
> users can send the commands directly to the CEP operator via side-input. So
> the protocol would take versioning and format evolution into account. After
> the first step, the users CAN make dynamic pattern work with the help from
> some external dependencies and maintenance effort.
>
> 2. Discuss about a control plane from the use case of CEP dynamic pattern,
> and let CEP dynamic pattern use that control plane if we eventually think
> that is the right way to go.
>
> Assuming that we are not going to stop after step 1 is done, will the end
> state be that the CEP dynamic pattern supports both approaches? It is not
> clear to me how this would work. For example, will a pattern ingested from
> side-input be managed by OC as well? If more and more users pick the more
> ergonomic way, would the side-input option just die out? In that case, will
> we deprecate that?
>
> My main concern is that the time and work we invest in step 1 will be
> thrown away. Not only that, at the end of step 1, in order to actually use
> the feature and make it production ready, there is a lot of work for the
> users to do. All that work may also become in vain. So it would be good if
> we can avoid that by having a clean approach to begin with. What do you
> think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Jan 5, 2022 at 11:35 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > I think I would scope the effort slightly differently. Note that I might
> be
> > missing some requirements or overlook something.
> >
> > 1. Enable Flink to support CEP dynamic patterns
> >
> > Here I would define the commands and formats for a new CEP pattern. Then
> I
> > would extend the CEP operator to understand these commands so that we can
> > change the CEP patterns dynamically. This should give the building blocks
> > to set up a job where you can change the CEP patterns by reading from an
> > arbitrary side input the CEP pattern commands. Note that it will be the
> > responsibility of the user to ingest the data somehow. This is nothing
> > Flink is concerned with at this time.
> >
> > 2. Think about how to make the CEP dynamic pattern feature more ergonomic
> >
> > Here we can pick up the discussion about OC and having a REST ingestion
> > endpoint in Flink vs. offering tools that live outside of Flink itself. I
> > could imagine that this could become a separate small project that builds
> > upon Flink, for example.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jan 5, 2022 at 2:19 PM Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Till,
> > >
> > > Thanks for the prompt reply. Like you said, we are indeed using the
> > dynamic
> > > CEP pattern use case to test the existing primitives in Flink to see if
> > > they can meet the requirements. I fully understand the concern of
> > > exposing OC as a user interface. Meanwhile I see CEP dynamic patterns
> as
> > a
> > > good opportunity to battle test and enhance the OC as a user facing
> > control
> > > plane which is currently missing. After all, there is no better person
> > than
> > > ourselves to try it out first.
> > >
> > > It is not clear to me whether it is worth continuing the effort of
> > > supporting dynamic CEP pattern without concluding the control plane
> > > discussion. Let's say we have a CEP job reading from Kafka. To make
> this
> > > work with side-input, a few things need to be done.
> > >
> > >    1. In order to support dynamic patterns, users would create another
> > >    Kafka topic as side-input to receive dynamic patterns.
> > >    2. In order to insert dynamic patterns, users would use a separate
> web
> > >    server that is provided by us as a separate tool. The web server
> takes
> > > http
> > >    requests and sends dynamic pattern records to Kafka via a Kafka sink
> > > (using
> > >    a KafkaProducer is likely simpler here, though).
> > >    3. Regarding querying the running dynamic patterns, given Kafka is
> not
> > >    queryable, users would probably introduce a database and insert the
> > >    patterns there so they can query the running patterns. Maybe this
> > could
> > > be
> > >    done by the CEP operator side-output, so there is less chance of
> > >    inconsistency between Kafka and the database.
> > >    4. If the Flink job is to be stopped, the dynamic pattern Kafka
> topic
> > >    needs to be deleted, the companion web server also needs to be
> stopped
> > >    (assuming it is not shared with other CEP jobs), and the database
> > table
> > >    storing the dynamic pattern needs to be dropped.
> > >
> > > Please correct me if I misunderstood something, but this seems quite
> > > involved. Moreover, all the work here is going to be thrown away after
> we
> > > have OC as a decent user facing control plane in place. And we will
> > likely
> > > have a backwards incompatible API change here. Given that, I am
> wondering
> > > if we should wait until the OC discussion concludes before moving on
> with
> > > the dynamic patterns?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Wed, Jan 5, 2022 at 5:53 PM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > >
> > > > Thanks for the detailed explanation Becket.
> > > >
> > > > Do you think that an additional dependency is a deal breaker for
> people
> > > to
> > > > use dynamic CEP patterns? At the very least people have to operate
> some
> > > > kind of storage/queue system from which the CEP job can read anyway.
> > > Maybe
> > > > it could be good enough to provide a REST endpoint (as a separate
> tool)
> > > > that can be instantiated with a Flink sink to ingest REST requests
> > > > into some queue. A general concern I have is that by making the JM a
> > REST
> > > > ingestion point for data will push another responsibility to Flink
> and
> > > > increase the surface area further.
> > > >
> > > > For how to handle data that cannot be processed by patterns, I think
> > > there
> > > > also exist other solutions. I could imagine that users could define
> > > > different failover strategies. E.g. one could simply ignore the
> record,
> > > the
> > > > pattern could get deactivated on the affected TM or the processing
> > fails.
> > > >
> > > > Maybe we are coupling the dynamic CEP pattern effort too much on
> where
> > > the
> > > > new patterns come from. Maybe we can split these efforts into
> > supporting
> > > > dynamic CEP patterns on the TM reading from some source and then fork
> > off
> > > > the discussion about introducing a user controlled control plane to
> > > Flink.
> > > > That way we wouldn't block this effort and could discuss more about
> the
> > > > exact properties such a user control plane would need to have. What
> do
> > > you
> > > > think?
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Jan 5, 2022 at 7:18 AM Becket Qin <becket....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > Thanks for the comments and questions. To be clear, I am not saying
> > > that
> > > > > the side-input stream does not work for the dynamic pattern update
> > use
> > > > > case. But I think OC is a better solution. The design goal for CEP
> > > > dynamic
> > > > > pattern is not only make it work, but also make it user friendly
> and
> > > > > extensible. So as a user, I expect the following:
> > > > >
> > > > > - Use CEP with dynamic pattern update without depending on external
> > > > > systems. e.g. updating the pattern by directly talking to the Flink
> > job
> > > > > itself.
> > > > > - Some sort of isolation between patterns. e.g. a pattern update
> > > failure
> > > > > won't cause other running patterns to fail.
> > > > > - Easy to query the currently running patterns in the Flink job.
> > > > > - Extensible to add new features. e.g. apply some pattern to a
> subset
> > > of
> > > > > subtasks. Disable dynamic pattern update during a service window,
> > etc.
> > > > >
> > > > > It looks to me that OC is a more promising way to to achieve the
> > above.
> > > > >
> > > > > Please also see the reply to your questions inline below.
> > > > >
> > > > > > You mentioned that a TM might find out that a pattern is invalid
> > and
> > > > then
> > > > > > it could use the 2-way communication with the JM to tell the
> other
> > > TMs
> > > > > that
> > > > > > the pattern is invalid. How exactly would a TM detect that a
> > pattern
> > > is
> > > > > > invalid at data processing time? And assuming that this is
> possible
> > > > for a
> > > > > > single TM, why can't the other TMs not detect this problem? Is it
> > > > because
> > > > > > the validity of patterns might depend on data that is different
> > > between
> > > > > > TMs?
> > > > >
> > > > >
> > > > > Yes, the TMs are processing different records, so some may
> encounter
> > > > issue
> > > > > while others are running fine, for example, a field may be missing
> > > from a
> > > > > record while the pattern requires it. Even if all the TMs have
> > detected
> > > > the
> > > > > problem, without a channel to report back the issue, the only thing
> > TMs
> > > > can
> > > > > do is either to throw exception and stop processing.
> > > > >
> > > > > You also mentioned that deep-learning-on-flink could benefit from
> an
> > OC
> > > > > > control plane but I couldn't find the referenced link. Would
> > > > > > deep-learning-on-flink like to use OCs as a kind of parameter
> > server?
> > > > > Could
> > > > > > you elaborate on deep-learning-on-flink's usage of such a
> feature?
> > > > >
> > > > >
> > > > > deep-learning-on-flink[1] has a general ML cluster abstraction that
> > can
> > > > run
> > > > > TF / PyTorch etc. This cluster has a ML master role to manage the
> > > > > liveliness of all the ML workers which are python processes running
> > > > > side-by-side with TMs. The ML workers can be PS or Worker of
> > > TensorFlow.
> > > > > For example, if 10 TF worker and 5 TF PS nodes are needed, a Flink
> > UDF
> > > > > operator of parallelism 10 and a Flink Source operator with
> > parallelism
> > > > = 5
> > > > > will be created to run the TF worker and TF PS respectively. All
> the
> > 15
> > > > > nodes running either PS or Worker are managed by the ML master.
> Prior
> > > to
> > > > > OC, the implementation was having a Source operator with
> parallelism
> > =
> > > 1
> > > > to
> > > > > run the master; let the ML workers register themselves to the an
> > > external
> > > > > ZK so the master can discover them. So basically users have to
> build
> > > > their
> > > > > own control plane. With OC, at very least, we no longer need ZK
> > > anymore.
> > > > >
> > > > > Concerning reprocessing command history and repeatable processing
> > > > results I
> > > > > > think there is actually quite a big difference between using a
> > Flink
> > > > > source
> > > > > > vs. offering a REST server that can receive command that are
> > > > distributed
> > > > > > via the OCs. In the former case we can rely on the external
> system
> > to
> > > > > > persist commands whereas in the latter approach we/users have to
> > > > > implement
> > > > > > a storage solution on their own (in the simplest case users will
> > > > probably
> > > > > > store everything in state which might grow indefinitely if stored
> > as
> > > a
> > > > > > changelog).
> > > > >
> > > > >
> > > > > It is true that a Flink Source is easier to provide a change log of
> > > > > patterns. But there are also downsides. The most prominent one is
> > that
> > > in
> > > > > order to update the dynamic pattern, users have to run an external
> > > system
> > > > > for the source to read. Or users have to implement a REST Source
> > taking
> > > > > command directly like a web server. In that case, the service
> > discovery
> > > > is
> > > > > again a problem and needs external dependency.
> > > > > On the other hand, it is not so difficult for OC to store a change
> > log
> > > > > history. Assuming there are 100 pattern updates / day for a CEP job
> > and
> > > > one
> > > > > year of history should be stored. Let's say each pattern change log
> > > entry
> > > > > is 1K bytes. The pattern update change log for the entire year is
> > just
> > > 34
> > > > > MB, which seems quite small. And users can also query the patterns
> in
> > > > > effect via the same endpoint. I think this would provide a good
> > > > out-of-box
> > > > > experience in the vast majority of cases. No external dependencies
> > and
> > > a
> > > > > single REST endpoint. In rare cases where there is a large amount
> of
> > > > > pattern update log. External systems can still be leveraged. One
> > > > potential
> > > > > issue might be that a command that has been received but not
> > > checkpointed
> > > > > yet may got lost if the job fails over. This can be mitigated by
> > > allowing
> > > > > OC to ask JM to trigger a checkpoint (not there yet but may worth
> > > > > thinking), or simply introduce a *PendingCommit* state for a
> pattern
> > > > > indicating it's not committed yet. In the worst case, we can always
> > > > > fallback to store patterns in external systems such as a database.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/flink-extended/dl-on-flink/tree/master/deep-learning-on-flink
> > > > >
> > > > >
> > > > > On Tue, Jan 4, 2022 at 4:45 PM Till Rohrmann <trohrm...@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Hi Becket,
> > > > > >
> > > > > > Thanks for the explanation. While I do agree that a general 2-way
> > > > > > communication pattern would be nice to have, I also believe that
> > this
> > > > > > approach is probably at least one magnitude more complex to
> realize
> > > > than
> > > > > > the side-input approach. Therefore, I really would like to
> > understand
> > > > why
> > > > > > such a mechanism is required for the CEP use case.
> > > > > >
> > > > > > You mentioned that a TM might find out that a pattern is invalid
> > and
> > > > then
> > > > > > it could use the 2-way communication with the JM to tell the
> other
> > > TMs
> > > > > that
> > > > > > the pattern is invalid. How exactly would a TM detect that a
> > pattern
> > > is
> > > > > > invalid at data processing time? And assuming that this is
> possible
> > > > for a
> > > > > > single TM, why can't the other TMs not detect this problem? Is it
> > > > because
> > > > > > the validity of patterns might depend on data that is different
> > > between
> > > > > > TMs?
> > > > > >
> > > > > > You also mentioned that deep-learning-on-flink could benefit from
> > an
> > > OC
> > > > > > control plane but I couldn't find the referenced link. Would
> > > > > > deep-learning-on-flink like to use OCs as a kind of parameter
> > server?
> > > > > Could
> > > > > > you elaborate on deep-learning-on-flink's usage of such a
> feature?
> > > > > >
> > > > > > Concerning reprocessing command history and repeatable processing
> > > > > results I
> > > > > > think there is actually quite a big difference between using a
> > Flink
> > > > > source
> > > > > > vs. offering a REST server that can receive command that are
> > > > distributed
> > > > > > via the OCs. In the former case we can rely on the external
> system
> > to
> > > > > > persist commands whereas in the latter approach we/users have to
> > > > > implement
> > > > > > a storage solution on their own (in the simplest case users will
> > > > probably
> > > > > > store everything in state which might grow indefinitely if stored
> > as
> > > a
> > > > > > changelog).
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Jan 4, 2022 at 6:41 AM Becket Qin <becket....@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > Thanks for sharing your thoughts. Some quick reply to your
> > > comments:
> > > > > > >
> > > > > > > We're still talking about the "web server based"
> > > > > > > > pattern_processor_discoverer, but what about other use cases?
> > One
> > > > of
> > > > > my
> > > > > > > big
> > > > > > > > concerns is that user's can not really reuse any part of the
> > > Flink
> > > > > > > > ecosystem to implement the discovery logic. For example if
> they
> > > > want
> > > > > to
> > > > > > > > read patterns from Kafka topic, they need to roll their own
> > > > > discoverer
> > > > > > > > based on the vanilla Kafka client. If we're talking about
> > > > > > extensibility,
> > > > > > > > should we also make sure that the existing primitives can be
> > > > reused?
> > > > > > >
> > > > > > >
> > > > > > > KafkaSource is actually more complicated than vanilla
> > KafkaConsumer
> > > > > from
> > > > > > > the perspective of consuming Kafka messages. The KafkaSource
> just
> > > > helps
> > > > > > > make it easier for Flink jobs to process these messages. In
> fact,
> > > > > > > KafkaConsumer is more powerful than KafkaSource in terms of
> > talking
> > > > to
> > > > > > > Kafka. So we are comparing [KafkaConsumer +
> OperatorCoordinator]
> > > and
> > > > > > > [KafkaSource + side-input], not [KafkaConsumer + side-input].
> > > > > > >
> > > > > > > This can be done for the side-input as well by filtering
> invalid
> > > > > patterns
> > > > > > > > before the broadcast. You can also send the invalid patterns
> to
> > > any
> > > > > > side
> > > > > > > > output you want. I have a feeling that we're way too attached
> > to
> > > > the
> > > > > > REST
> > > > > > > > server use case in this discussion. I agree that for that
> case,
> > > > this
> > > > > > > > solution is the most straightforward one.
> > > > > > >
> > > > > > >
> > > > > > > Depending on whether the invalidity is discovered in pattern
> > > > definition
> > > > > > > time or data processing time. e.g. A valid pattern with invalid
> > > data
> > > > > > which
> > > > > > > fails to process can only be detected in data processing time.
> So
> > > the
> > > > > > > pattern won't be filtered out before the broadcast.
> > > > > > >
> > > > > > > I agree that 2-way communication in the "data-flow like" API is
> > > > tricky,
> > > > > > > > because it requires cycles / iterations, which are still not
> > > really
> > > > > > > solved
> > > > > > > > (for a good reason, it's really tough nut to crack). This
> makes
> > > me
> > > > > > think
> > > > > > > > that the OC may be bit of a "incomplete" workaround for not
> > > having
> > > > > > fully
> > > > > > > > working support for iterations.
> > > > > > >
> > > > > > >
> > > > > > > For example I'm not really confident that the checkpointing of
> > the
> > > OC
> > > > > > works
> > > > > > > > correctly right now, because it doesn't seem to require
> > > checkpoint
> > > > > > > barrier
> > > > > > > > alignment as the regular stream inputs. We also don't have a
> > > proper
> > > > > > > support
> > > > > > > > for watermarking (this is again tricky, because of the
> cycle).
> > > > > > >
> > > > > > >
> > > > > > > If we decide to go down this road, should we first address some
> > of
> > > > > these
> > > > > > > > limitations?
> > > > > > >
> > > > > > > I agree. OC is proven to be useful and we should think about
> how
> > to
> > > > > > enhance
> > > > > > > it instead of not using it.
> > > > > > >
> > > > > > > If I understand that correctly, this means only the LATEST
> state
> > of
> > > > the
> > > > > > > > patterns (in other words - patterns that are currently in
> use).
> > > Is
> > > > > this
> > > > > > > > really sufficient for historical re-processing? Can someone
> for
> > > > > example
> > > > > > > > want re-process the data in more of a "temporal join"
> fashion?
> > > Also
> > > > > > AFAIK
> > > > > > > > historical processing in combination with "coordinator
> > > checkpoints"
> > > > > is
> > > > > > > not
> > > > > > > > really something that we currently support of the box, are
> > there
> > > > any
> > > > > > > plans
> > > > > > > > on tackling this (my other concern is that this should not go
> > > > against
> > > > > > the
> > > > > > > > "unified batch & stream processing" efforts)?
> > > > > > >
> > > > > > > I think in this case, the state would contain the entire
> pattern
> > > > update
> > > > > > > history. Not the final state, but the change log. In fact, it
> is
> > > not
> > > > > > > possible to guarantee the correct result unless you have the
> > entire
> > > > > > change
> > > > > > > log. Even with side-input, there still could be late arrivals
> in
> > > the
> > > > > > > pattern update stream, and the only way to correct it is to
> > either
> > > > have
> > > > > > all
> > > > > > > the data reprocessed (with all the pattern change log loaded
> > > upfront)
> > > > > or
> > > > > > > having retraction support.
> > > > > > >
> > > > > > > I can imagine that if this should be a concern, we could move
> the
> > > > > > execution
> > > > > > > > of the OC to the task managers. This also makes me thing,
> that
> > we
> > > > > > > shouldn't
> > > > > > > > make any strong assumptions that the OC will always run in
> the
> > > > > > JobManager
> > > > > > > > (this is especially relevant for the embedded web-server use
> > > case).
> > > > > > >
> > > > > > > This is a very good point. I think we should do this. The
> reason
> > OC
> > > > is
> > > > > in
> > > > > > > JM is based on the assumption that control plane usually have
> > > little
> > > > > > > traffic. But if that is not the case, we should move the OC out
> > of
> > > > JM,
> > > > > > > maybe to TM.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I think the key topic we are discussing is what the Flink
> control
> > > > plane
> > > > > > > should look like, and CEP is just an example use case. There
> are
> > > > > > basically
> > > > > > > two options:
> > > > > > >
> > > > > > > 1. Using the side-input / broadcast stream
> > > > > > > 2. Using the OperatorCoordinator.
> > > > > > >
> > > > > > > Although we are having this discussion under the context of
> > > > supporting
> > > > > > CEP
> > > > > > > dynamic pattern update, it would be really helpful to reach
> > > agreement
> > > > > on
> > > > > > > the design principle in general of how Flink should handle
> > similar
> > > > > > external
> > > > > > > control demands, a.k.a how to provide a decent control plane in
> > > Flink
> > > > > to
> > > > > > > the users.
> > > > > > >
> > > > > > > Here is my take on these two options, based on the requirements
> > of
> > > a
> > > > > > > control plane.
> > > > > > >
> > > > > > > *Communication Requirements*
> > > > > > > Flink has two kinds of control planes - in-band and
> out-of-band.
> > > > > > > In-band control plane is like watermark and checkpoint marker.
> It
> > > > > > requires
> > > > > > > the entire DAG to be in a consistent state, so the control
> > messages
> > > > > flow
> > > > > > > with the data in one direction. This is a one-way control flow.
> > > > > > > Examples of out-of-band control plane is OC and JM/TM
> > > communication.
> > > > > They
> > > > > > > do not require a global consistency or awareness across the
> > entire
> > > > DAG.
> > > > > > >
> > > > > > > Before OC, neither of these two kinds of control plain was
> > exposed
> > > to
> > > > > the
> > > > > > > users. OC allows the users to leverage the out-of-band control
> > > plane
> > > > so
> > > > > > > they can coordinate across subtasks. This is a more widely used
> > > > control
> > > > > > > flow from user defined operator perspective which requires
> 2-way
> > > > > > > communication.
> > > > > > >
> > > > > > >    - side-input stream is one-way communication. Moreover, I
> > don't
> > > > > think
> > > > > > >    iteration is the right way to address this. Unlike the
> > > abstraction
> > > > > of
> > > > > > > data
> > > > > > >    processing which is a DAG, the control plane may require
> > > > > communication
> > > > > > >    between arbitrary two operators in different orders. Using
> > > > > iterations
> > > > > > > may
> > > > > > >    end up with an explosion of feedback edges that are not only
> > > > > > > complicated to
> > > > > > >    express, but also hard to debug and maintain.
> > > > > > >    - OC provides a decent 2-way communication mechanism.
> > > > > > >
> > > > > > >
> > > > > > > *Cost for low traffic use case*
> > > > > > > An important difference between data channel and control
> channel
> > is
> > > > > > traffic
> > > > > > > pattern. Unlike data plane, the control plane is usually idle
> for
> > > the
> > > > > > vast
> > > > > > > majority of the time with occasional control messages.
> > > > > > >
> > > > > > >    - One argument to use the side-input / broadcast stream was
> > that
> > > > > users
> > > > > > >    can reuse the existing connectors. However, most Flink
> sources
> > > are
> > > > > > > designed
> > > > > > >    to handle large throughput of traffic that maintains all the
> > > > > threads,
> > > > > > >    network connections, etc. Not to say the external resources
> > such
> > > > as
> > > > > > > Kafka
> > > > > > >    topics to store the commands.
> > > > > > >    - OC can have a much lower constant cost.
> > > > > > >
> > > > > > >
> > > > > > > *Easy to use*
> > > > > > > Personally speaking, I think the most simple and user-friendly
> > > > > interface
> > > > > > is
> > > > > > > a REST API that accepts control commands.
> > > > > > >
> > > > > > >    - One proposal at this point is to have a separate web
> server
> > > > > > >    independent of the Flink job. In that case, should users
> > > implement
> > > > > > that
> > > > > > > web
> > > > > > >    server? In order to pass the command to the Flink job, does
> > that
> > > > > mean
> > > > > > > users
> > > > > > >    should implement a Flink source that can read the commands
> > from
> > > > the
> > > > > > web
> > > > > > >    server? Or the web server would send the command to
> something
> > > like
> > > > > > Kafka
> > > > > > >    and let the Flink side-input read from there. There seems to
> > be
> > > a
> > > > > lot
> > > > > > > for
> > > > > > >    the users to do. And it seems an unnecessarily complicated
> > > system
> > > > to
> > > > > > > build
> > > > > > >    to send occasional control messages to a Flink job.
> > > > > > >    - OC can simply be reached via the JM REST API.
> > > > > > >       - e.g.
> http://JM_IP/OperatorEvents?OP_NAME=xxx&Content=xxx
> > > > > > >       <http://jm_ip/OperatorEvents?OP_NAME=xxx&Content=xxx>
> > would
> > > > > simply
> > > > > > >       send the Content to the OC of OP_NAME.
> > > > > > >
> > > > > > > *Work well with checkpoints*
> > > > > > >
> > > > > > >    - side-input streams works well with checkpoint, thanks to
> its
> > > > > > >    compatibility to the DAG abstraction.
> > > > > > >    - The OperatorCoordinator requires more careful design to
> work
> > > > well
> > > > > > with
> > > > > > >    checkpoints. The complexity mostly comes from the 2-way
> > > > > communication.
> > > > > > > At
> > > > > > >    this point the OperatorCoordinator message delivery semantic
> > may
> > > > not
> > > > > > be
> > > > > > > as
> > > > > > >    robust as side-input streams. However, I think this is
> mainly
> > > due
> > > > to
> > > > > > >    potential bugs in implementation instead of fundamental
> design
> > > > > > problems.
> > > > > > >    Basically the checkpoint semantic should be:
> > > > > > >       - The JM will first checkpoint all the
> OperatorCoordinator,
> > > > more
> > > > > > >       specifically for each OperatorCoordinator,
> > > > > > >          - 1) JM blocks the input queue to stop receiving new
> > > events
> > > > > from
> > > > > > >          the subtasks.
> > > > > > >          - 2) let all the operator coordinator finish
> processing
> > > all
> > > > > the
> > > > > > >          pending events in the OC input queue.
> > > > > > >          - 3) flush the OC output queue until all the events
> sent
> > > are
> > > > > > >          acknowledged.
> > > > > > >          - 3) unblock the input queue, but block the output
> queue
> > > > until
> > > > > > all
> > > > > > >          the subtasks of the OC finish checkpoints.
> > > > > > >       - The JM starts a normal checkpoint starting from the
> > Source.
> > > > > > >          - If an operator has a coordinator, and it has sent
> some
> > > > > events
> > > > > > to
> > > > > > >          the OC but has not been acknowledged. These events
> need
> > to
> > > > be
> > > > > > > put into the
> > > > > > >          subtasks' checkpoint.
> > > > > > >       - After all the subtasks of an operator finishes
> > checkpoint,
> > > > the
> > > > > > >       output queue of the corresponding OC can be unblocked.
> > > > > > >    It looks that the above protocol would let OC have a clear
> > > > > checkpoint
> > > > > > >    semantic. And this complexity seems must for 2-way
> > > communication.
> > > > I
> > > > > > > think
> > > > > > >    we are able to hide the above protocol from users by having
> an
> > > > > > abstract
> > > > > > > OC
> > > > > > >    implementation.
> > > > > > >
> > > > > > >
> > > > > > > *Reprocess command history and repeatable processing results*
> > > > > > > This seems not something generally required for a control
> plane.
> > > But
> > > > if
> > > > > > the
> > > > > > > control plane changes the data processing behavior, there might
> > be
> > > > some
> > > > > > use
> > > > > > > case. Just like the CEP pattern update. In order to do this,
> > users
> > > > will
> > > > > > > need to provide a full command history to the operators. That
> > > > consists
> > > > > of
> > > > > > > two steps:
> > > > > > >
> > > > > > >    1. get the full command history.  (Flink Source V.S. Custom
> > way
> > > to
> > > > > > >    retrieve command history.)
> > > > > > >    2. send them to the operators. (Side-input V.S. OC/Operator
> > > > > > >    communication)l
> > > > > > >
> > > > > > > I actually don't feel much difference between these two
> options.
> > I
> > > am
> > > > > not
> > > > > > > sure if a Flink source is necessarily simpler than the user
> > > > implemented
> > > > > > > logic. For example, if all the changes are just written in a
> > > database
> > > > > > > table. From the user's perspective, I feel fine with querying
> the
> > > DB
> > > > by
> > > > > > > myself using something like JDBC (or use KafkaConsumer to read
> > the
> > > > > > pattern
> > > > > > > history from a Kafka topic) and send all the pattern change
> > history
> > > > to
> > > > > > the
> > > > > > > operators before they start to process the actual events.
> > > > > > >
> > > > > > > *Stability impact to the Flink Job*
> > > > > > > I agree this is a valid concern. There is only one JM and
> > currently
> > > > OC
> > > > > > runs
> > > > > > > there, which may potentially cause JM crash and result in job
> > > > failure.
> > > > > > > Running in TM may have a less impact. So I think it makes sense
> > to
> > > > run
> > > > > OC
> > > > > > > in TM. However, this is more of how we can implement OC to make
> > it
> > > > more
> > > > > > > robust. It does not forfeit the other semantic and user
> > experience
> > > > > > benefits
> > > > > > > mentioned above.
> > > > > > >
> > > > > > >
> > > > > > > To sum up:
> > > > > > > 1. Regardless of whether CEP chooses OC or side-input stream, a
> > > > decent
> > > > > > > 2-way communication control plane seems really helpful and has
> > been
> > > > > > proven
> > > > > > > by quite a few existing use cases. To me the question is how to
> > > make
> > > > it
> > > > > > > more robust, not whether we should use it or not.
> > > > > > > 2. As for CEP, if we assume OC will be robust enough,
> personally
> > I
> > > > feel
> > > > > > OC
> > > > > > > is a better fit than side-input mainly because of the
> simplicity
> > > and
> > > > > > > extensibility.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Dec 30, 2021 at 8:43 PM David Morávek <d...@apache.org
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > sorry for the late reply, vacation season ;) I'm still not
> 100%
> > > > sold
> > > > > on
> > > > > > > > choosing the OC for this use-case, but on the other hand I
> > don't
> > > > have
> > > > > > > > strong arguments against it. Few more questions / thoughts:
> > > > > > > >
> > > > > > > > We're still talking about the "web server based"
> > > > > > > > pattern_processor_discoverer, but what about other use cases?
> > One
> > > > of
> > > > > my
> > > > > > > big
> > > > > > > > concerns is that user's can not really reuse any part of the
> > > Flink
> > > > > > > > ecosystem to implement the discovery logic. For example if
> they
> > > > want
> > > > > to
> > > > > > > > read patterns from Kafka topic, they need to roll their own
> > > > > discoverer
> > > > > > > > based on the vanilla Kafka client. If we're talking about
> > > > > > extensibility,
> > > > > > > > should we also make sure that the existing primitives can be
> > > > reused?
> > > > > > > >
> > > > > > > > For instance, the example I gave in my previous email seems
> not
> > > > > easily
> > > > > > > > > achievable with side-input / broadcast streams: a single
> > > invalid
> > > > > > > pattern
> > > > > > > > > detected on a TM can be disabled elegantly globally without
> > > > > crashing
> > > > > > > the
> > > > > > > > > entire Flink job.
> > > > > > > >
> > > > > > > >
> > > > > > > > This can be done for the side-input as well by filtering
> > invalid
> > > > > > patterns
> > > > > > > > before the broadcast. You can also send the invalid patterns
> to
> > > any
> > > > > > side
> > > > > > > > output you want. I have a feeling that we're way too attached
> > to
> > > > the
> > > > > > REST
> > > > > > > > server use case in this discussion. I agree that for that
> case,
> > > > this
> > > > > > > > solution is the most straightforward one.
> > > > > > > >
> > > > > > > > OC is a 2-way communication mechanism, i.e. a subtask can
> also
> > > send
> > > > > > > > > OperatorEvent to the coordinator to report its owns status,
> > so
> > > > that
> > > > > > the
> > > > > > > > > coordinator can act accordingly.
> > > > > > > > >
> > > > > > > >
> > > > > > > > I agree that 2-way communication in the "data-flow like" API
> is
> > > > > tricky,
> > > > > > > > because it requires cycles / iterations, which are still not
> > > really
> > > > > > > solved
> > > > > > > > (for a good reason, it's really tough nut to crack). This
> makes
> > > me
> > > > > > think
> > > > > > > > that the OC may be bit of a "incomplete" workaround for not
> > > having
> > > > > > fully
> > > > > > > > working support for iterations.
> > > > > > > >
> > > > > > > > For example I'm not really confident that the checkpointing
> of
> > > the
> > > > OC
> > > > > > > works
> > > > > > > > correctly right now, because it doesn't seem to require
> > > checkpoint
> > > > > > > barrier
> > > > > > > > alignment as the regular stream inputs. We also don't have a
> > > proper
> > > > > > > support
> > > > > > > > for watermarking (this is again tricky, because of the
> cycle).
> > > > > > > >
> > > > > > > > If we decide to go down this road, should we first address
> some
> > > of
> > > > > > these
> > > > > > > > limitations?
> > > > > > > >
> > > > > > > > OperatorCoordinator will checkpoint the full amount of
> > > > > PatternProcessor
> > > > > > > > > data. For the reprocessing of historical data, you can read
> > the
> > > > > > > > > PatternProcessor snapshots saved by this checkpoint from a
> > > > certain
> > > > > > > > > historical checkpoint, and then recreate the historical
> data
> > > > > through
> > > > > > > > these
> > > > > > > > > PatternProcessor snapshots.
> > > > > > > > >
> > > > > > > >
> > > > > > > > If I understand that correctly, this means only the LATEST
> > state
> > > of
> > > > > the
> > > > > > > > patterns (in other words - patterns that are currently in
> use).
> > > Is
> > > > > this
> > > > > > > > really sufficient for historical re-processing? Can someone
> for
> > > > > example
> > > > > > > > want re-process the data in more of a "temporal join"
> fashion?
> > > Also
> > > > > > AFAIK
> > > > > > > > historical processing in combination with "coordinator
> > > checkpoints"
> > > > > is
> > > > > > > not
> > > > > > > > really something that we currently support of the box, are
> > there
> > > > any
> > > > > > > plans
> > > > > > > > on tackling this (my other concern is that this should not go
> > > > against
> > > > > > the
> > > > > > > > "unified batch & stream processing" efforts)?
> > > > > > > >
> > > > > > > > I do agree that having the user defined control logic defined
> > in
> > > > the
> > > > > JM
> > > > > > > > > increases the chance of instability.
> > > > > > > > >
> > > > > > > >
> > > > > > > > I can imagine that if this should be a concern, we could move
> > the
> > > > > > > execution
> > > > > > > > of the OC to the task managers. This also makes me thing,
> that
> > we
> > > > > > > shouldn't
> > > > > > > > make any strong assumptions that the OC will always run in
> the
> > > > > > JobManager
> > > > > > > > (this is especially relevant for the embedded web-server use
> > > case).
> > > > > > > >
> > > > > > > > If an agreement is reached on OperatorCoodinator, I will
> start
> > > the
> > > > > > voting
> > > > > > > > > thread.
> > > > > > > > >
> > > > > > > >
> > > > > > > > As for the vote, I'd would be great if we can wait until the
> > next
> > > > > week
> > > > > > as
> > > > > > > > many people took vacation until end of the year.
> > > > > > > >
> > > > > > > > Overall, I really like the feature, this will be a great
> > addition
> > > > to
> > > > > > > Flink.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > D.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Dec 30, 2021 at 11:27 AM Martijn Visser <
> > > > > mart...@ververica.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I can understand the need for a control plane mechanism.
> I'm
> > > not
> > > > > the
> > > > > > > > > technical go-to person for questions on the
> > > OperatorCoordinator,
> > > > > but
> > > > > > I
> > > > > > > > > would expect that we could offer those interfaces from
> Flink
> > > but
> > > > > > > > shouldn't
> > > > > > > > > recommend running user-code in the JobManager itself. I
> think
> > > the
> > > > > > user
> > > > > > > > code
> > > > > > > > > (like a webserver) should run outside of Flink (like via a
> > > > sidecar)
> > > > > > and
> > > > > > > > use
> > > > > > > > > only the provided interfaces to communicate.
> > > > > > > > >
> > > > > > > > > I would like to get @David Morávek <d...@apache.org>
> opinion
> > > on
> > > > > the
> > > > > > > > > technical part.
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Martijn
> > > > > > > > >
> > > > > > > > > On Thu, 30 Dec 2021 at 10:07, Nicholas Jiang <
> > > > > > nicholasji...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Konstantin, Becket, Martijn,
> > > > > > > > >>
> > > > > > > > >> Thanks for sharing your feedback. What other concerns do
> you
> > > > have
> > > > > > > about
> > > > > > > > >> OperatorCoodinator? If an agreement is reached on
> > > > > > OperatorCoodinator,
> > > > > > > I
> > > > > > > > >> will start the voting thread.
> > > > > > > > >>
> > > > > > > > >> Best,
> > > > > > > > >> Nicholas Jiang
> > > > > > > > >>
> > > > > > > > >> On 2021/12/22 03:19:58 Becket Qin wrote:
> > > > > > > > >> > Hi Konstantin,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for sharing your thoughts. Please see the reply
> > > inline
> > > > > > below.
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Jiangjie (Becket) Qin
> > > > > > > > >> >
> > > > > > > > >> > On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf <
> > > > > > kna...@apache.org
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi Becket, Hi Nicholas,
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks for joining the discussion.
> > > > > > > > >> > >
> > > > > > > > >> > > 1 ) Personally, I would argue that we should only run
> > user
> > > > > code
> > > > > > in
> > > > > > > > the
> > > > > > > > >> > > Jobmanager/Jobmaster if we can not avoid it. It seems
> > > wrong
> > > > to
> > > > > > me
> > > > > > > to
> > > > > > > > >> > > encourage users to e.g. run a webserver on the
> > Jobmanager,
> > > > or
> > > > > > > > >> continuously
> > > > > > > > >> > > read patterns from a Kafka Topic on the Jobmanager,
> but
> > > both
> > > > > of
> > > > > > > > these
> > > > > > > > >> I see
> > > > > > > > >> > > happening with the current design. We've had lots of
> > > issues
> > > > > with
> > > > > > > > >> > > classloading leaks and other stability issues on the
> > > > > Jobmanager
> > > > > > > and
> > > > > > > > >> making
> > > > > > > > >> > > this more complicated, if there is another way, seems
> > > > > > unnecessary.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I think the key question here is what primitive does
> Flink
> > > > > provide
> > > > > > > to
> > > > > > > > >> > facilitate the user implementation of their own control
> > > logic
> > > > /
> > > > > > > > control
> > > > > > > > >> > plane? It looks that previously, Flink assumes that all
> > the
> > > > user
> > > > > > > logic
> > > > > > > > >> is
> > > > > > > > >> > just data processing logic without any control /
> > > coordination
> > > > > > > > >> requirements.
> > > > > > > > >> > However, it turns out that a decent control plane
> > > abstraction
> > > > is
> > > > > > > > >> required
> > > > > > > > >> > in association with the data processing logic in many
> > cases,
> > > > > > > including
> > > > > > > > >> > Source / Sink and other user defined operators in
> general.
> > > The
> > > > > > fact
> > > > > > > > >> that we
> > > > > > > > >> > ended up with adding the SplitEnumerator and
> > GlobalCommitter
> > > > are
> > > > > > > just
> > > > > > > > >> two
> > > > > > > > >> > examples of the demand of such coordination among user
> > > defined
> > > > > > > logics.
> > > > > > > > >> > There are other cases that we see in ecosystem projects,
> > > such
> > > > as
> > > > > > > > >> > deep-learning-on-flink[1]. Now we see this again in CEP.
> > > > > > > > >> >
> > > > > > > > >> > Such control plane primitives are critical to the
> > > > extensibility
> > > > > > of a
> > > > > > > > >> > project. If we look at other projects, exposing such
> > control
> > > > > plane
> > > > > > > > >> logic is
> > > > > > > > >> > quite common. For example, Hadoop ended up with exposing
> > > YARN
> > > > > as a
> > > > > > > > >> public
> > > > > > > > >> > API to the users, which is extremely popular. Kafka
> > > consumers
> > > > > > > exposed
> > > > > > > > >> the
> > > > > > > > >> > consumer group rebalance logic to the users via
> > > > > > > > >> ConsumerPartitionAssigner,
> > > > > > > > >> > which is also a control plane primitive.
> > > > > > > > >> >
> > > > > > > > >> > To me it is more important to think about how we can
> > improve
> > > > the
> > > > > > > > >> stability
> > > > > > > > >> > of such a control plane mechanism, instead of simply
> > saying
> > > no
> > > > > to
> > > > > > > the
> > > > > > > > >> users.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > 2) In addition, I suspect that, over time we will have
> > to
> > > > > > > implement
> > > > > > > > >> all the
> > > > > > > > >> > > functionality that regular sources already provide
> > around
> > > > > > > > consistency
> > > > > > > > >> > > (watermarks, checkpointing) for the
> > > > > PatternProcessorCoordinator,
> > > > > > > > too.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I think OperatorCoordinator should have a generic
> > > > communication
> > > > > > > > >> mechanism
> > > > > > > > >> > for all the operators, not specific to Source. We should
> > > > > probably
> > > > > > > have
> > > > > > > > >> an
> > > > > > > > >> > AbstractOperatorCoordinator help dealing with the
> > > > communication
> > > > > > > layer,
> > > > > > > > >> and
> > > > > > > > >> > leave the state maintenance and event handling logic to
> > the
> > > > user
> > > > > > > code.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > 3) I understand that running on the Jobmanager is
> easier
> > > if
> > > > > you
> > > > > > > want
> > > > > > > > >> to
> > > > > > > > >> > > launch a REST server directly. Here my question would
> > be:
> > > > does
> > > > > > > this
> > > > > > > > >> really
> > > > > > > > >> > > need to be solved inside of Flink or couldn't you
> start
> > a
> > > > > > > webserver
> > > > > > > > >> next to
> > > > > > > > >> > > Flink? If we start using the Jobmanager as a REST
> server
> > > > users
> > > > > > > will
> > > > > > > > >> expect
> > > > > > > > >> > > that e.g. it is highly available and can be load
> > balanced
> > > > and
> > > > > we
> > > > > > > > >> quickly
> > > > > > > > >> > > need to think about aspects that we never wanted to
> > think
> > > > > about
> > > > > > in
> > > > > > > > the
> > > > > > > > >> > > context of a Flink Jobmanager.
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > I think the REST API is just for receiving commands
> > > targeting
> > > > a
> > > > > > > > running
> > > > > > > > >> > Flink job. If the job fails, the REST API would be
> > useless.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > So, can you elaborate a bit more, why a
> > > side-input/broadcast
> > > > > > > stream
> > > > > > > > is
> > > > > > > > >> > >
> > > > > > > > >> > > a) more difficult
> > > > > > > > >> > > b) has vague semantics (To me semantics of a
> > stream-stream
> > > > > seem
> > > > > > > > >> clearer
> > > > > > > > >> > > when it comes to out-of-orderness, late data,
> > reprocessing
> > > > or
> > > > > > > batch
> > > > > > > > >> > > execution mode.)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I do agree that having the user defined control logic
> > > defined
> > > > in
> > > > > > the
> > > > > > > > JM
> > > > > > > > >> > increases the chance of instability. In that case, we
> may
> > > > think
> > > > > of
> > > > > > > > other
> > > > > > > > >> > solutions and I am fully open to that. But the
> side-input
> > /
> > > > > > > broadcast
> > > > > > > > >> > stream seems more like a bandaid instead of a carefully
> > > > designed
> > > > > > > > control
> > > > > > > > >> > plane mechanism.
> > > > > > > > >> >
> > > > > > > > >> > A decent control plane requires two-way communication,
> so
> > > > > > > information
> > > > > > > > >> can
> > > > > > > > >> > be reported / collected from the entity being
> controlled,
> > > and
> > > > > the
> > > > > > > > >> > coordinator / controller can send decisions or commands
> to
> > > the
> > > > > > > > entities
> > > > > > > > >> > accordingly, just like our TM / JM communication. IIUC,
> > this
> > > > is
> > > > > > not
> > > > > > > > >> > achievable with the existing side-input / broadcast
> stream
> > > as
> > > > > both
> > > > > > > of
> > > > > > > > >> them
> > > > > > > > >> > are one-way communication mechanisms. For instance, the
> > > > example
> > > > > I
> > > > > > > gave
> > > > > > > > >> in
> > > > > > > > >> > my previous email seems not easily achievable with
> > > side-input
> > > > /
> > > > > > > > >> broadcast
> > > > > > > > >> > streams: a single invalid pattern detected on a TM can
> be
> > > > > disabled
> > > > > > > > >> > elegantly globally without crashing the entire Flink
> job.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > > Cheers,
> > > > > > > > >> > >
> > > > > > > > >> > > Konstantin
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > On Tue, Dec 21, 2021 at 11:38 AM Becket Qin <
> > > > > > becket....@gmail.com
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Hi folks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > I just finished reading the previous emails. It was
> a
> > > good
> > > > > > > > >> discussion.
> > > > > > > > >> > > Here
> > > > > > > > >> > > > are my two cents:
> > > > > > > > >> > > >
> > > > > > > > >> > > > *Is OperatorCoordinator a public API?*
> > > > > > > > >> > > > Regarding the OperatorCoordinator, although it is
> > marked
> > > > as
> > > > > > > > internal
> > > > > > > > >> > > > interface at this point, I intended to make it a
> > public
> > > > > > > interface
> > > > > > > > >> when
> > > > > > > > >> > > add
> > > > > > > > >> > > > that in FLIP-27. This is a powerful cross-subtask
> > > > > > communication
> > > > > > > > >> mechanism
> > > > > > > > >> > > > that enables many use cases, Source / Sink / TF on
> > > Flink /
> > > > > CEP
> > > > > > > > here
> > > > > > > > >> > > again.
> > > > > > > > >> > > > To my understanding, OC was marked as internal
> because
> > > we
> > > > > > think
> > > > > > > it
> > > > > > > > >> is not
> > > > > > > > >> > > > stable enough yet. We may need to fortify the
> > > > OperatorEvent
> > > > > > > > delivery
> > > > > > > > >> > > > semantic a little bit so it works well with
> checkpoint
> > > in
> > > > > > > general.
> > > > > > > > >> > > >
> > > > > > > > >> > > > I think it is a valid concern that user code running
> > in
> > > JM
> > > > > may
> > > > > > > > cause
> > > > > > > > >> > > > instability. However, not providing this
> communication
> > > > > > mechanism
> > > > > > > > >> only
> > > > > > > > >> > > makes
> > > > > > > > >> > > > a lot of use case even harder to implement. So it
> > seems
> > > > that
> > > > > > > > having
> > > > > > > > >> the
> > > > > > > > >> > > OC
> > > > > > > > >> > > > exposed to end users brings more benefits than harm
> to
> > > > > Flink.
> > > > > > At
> > > > > > > > >> the end
> > > > > > > > >> > > of
> > > > > > > > >> > > > the day, users have many ways to crash a Flink job
> if
> > > they
> > > > > do
> > > > > > > not
> > > > > > > > >> write
> > > > > > > > >> > > > proper code. So making those who know what they do
> > happy
> > > > > seems
> > > > > > > > more
> > > > > > > > >> > > > important here.
> > > > > > > > >> > > >
> > > > > > > > >> > > > *OperatorCoordinator V.S. side-input / broadcast
> > stream*
> > > > > > > > >> > > > I think both of them can achieve the goal of dynamic
> > > > > patterns.
> > > > > > > The
> > > > > > > > >> main
> > > > > > > > >> > > > difference is the extensibility.
> > > > > > > > >> > > >
> > > > > > > > >> > > > OC is a 2-way communication mechanism, i.e. a
> subtask
> > > can
> > > > > also
> > > > > > > > send
> > > > > > > > >> > > > OperatorEvent to the coordinator to report its owns
> > > > status,
> > > > > so
> > > > > > > > that
> > > > > > > > >> the
> > > > > > > > >> > > > coordinator can act accordingly. This is sometimes
> > > useful.
> > > > > For
> > > > > > > > >> example, a
> > > > > > > > >> > > > single invalid pattern can be disabled elegantly
> > without
> > > > > > > crashing
> > > > > > > > >> the
> > > > > > > > >> > > > entire Flink job. In the future, if we allow users
> to
> > > send
> > > > > > > > external
> > > > > > > > >> > > command
> > > > > > > > >> > > > to OC via JM, a default self-contained
> implementation
> > > can
> > > > > just
> > > > > > > > >> update the
> > > > > > > > >> > > > pattern via the REST API without external
> > dependencies.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Another reason I personally prefer OC is because it
> is
> > > an
> > > > > > > explicit
> > > > > > > > >> > > control
> > > > > > > > >> > > > plain mechanism, where as the side-input / broadcast
> > > > stream
> > > > > > has
> > > > > > > > are
> > > > > > > > >> more
> > > > > > > > >> > > > vague semantic.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Jiangjie (Becket) Qin
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Tue, Dec 21, 2021 at 4:25 PM David Morávek <
> > > > > > d...@apache.org>
> > > > > > > > >> wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Hi Yunfeng,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > thanks for drafting this FLIP, this will be a
> great
> > > > > addition
> > > > > > > > into
> > > > > > > > >> the
> > > > > > > > >> > > CEP
> > > > > > > > >> > > > > toolbox!
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Apart from running user code in JM, which want to
> > > avoid
> > > > in
> > > > > > > > >> general, I'd
> > > > > > > > >> > > > > have one more another concern about using the
> > > > > > > > OperatorCoordinator
> > > > > > > > >> and
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > is re-processing of the historical data. Any
> > thoughts
> > > > > about
> > > > > > > how
> > > > > > > > >> this
> > > > > > > > >> > > will
> > > > > > > > >> > > > > work with the OC?
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > I have a slight feeling that a side-input (custom
> > > > source /
> > > > > > > > >> operator +
> > > > > > > > >> > > > > broadcast) would a better fit for this case. This
> > > would
> > > > > > > simplify
> > > > > > > > >> the
> > > > > > > > >> > > > > consistency concerns (watermarks + pushback) and
> the
> > > > > > > > >> re-processing of
> > > > > > > > >> > > > > historical data.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Best,
> > > > > > > > >> > > > > D.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <
> > > > > > > > >> > > nicholasji...@apache.org
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > Hi Konstantin, Martijn
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks for the detailed feedback in the
> > discussion.
> > > > > What I
> > > > > > > > >> still have
> > > > > > > > >> > > > > left
> > > > > > > > >> > > > > > to answer/reply to:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > -- Martijn: Just to be sure, this indeed would
> > mean
> > > > that
> > > > > > if
> > > > > > > > for
> > > > > > > > >> > > > whatever
> > > > > > > > >> > > > > > reason the heartbeat timeout, it would crash the
> > > job,
> > > > > > right?
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > IMO, if for whatever reason the heartbeat
> timeout,
> > > it
> > > > > > > couldn't
> > > > > > > > >> check
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > PatternProcessor consistency between the
> > > > > > OperatorCoordinator
> > > > > > > > >> and the
> > > > > > > > >> > > > > > subtasks so that the job would be crashed.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > -- Konstantin: What I was concerned about is
> that
> > we
> > > > > > > basically
> > > > > > > > >> let
> > > > > > > > >> > > > users
> > > > > > > > >> > > > > > run a UserFunction in the OperatorCoordinator,
> > which
> > > > it
> > > > > > does
> > > > > > > > >> not seem
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > have been designed for.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > In general, we have reached an agreement on the
> > > design
> > > > > of
> > > > > > > this
> > > > > > > > >> FLIP,
> > > > > > > > >> > > > but
> > > > > > > > >> > > > > > there are some concerns on the
> > OperatorCoordinator,
> > > > > about
> > > > > > > > >> whether
> > > > > > > > >> > > > > basically
> > > > > > > > >> > > > > > let users run a UserFunction in the
> > > > OperatorCoordinator
> > > > > is
> > > > > > > > >> designed
> > > > > > > > >> > > for
> > > > > > > > >> > > > > > OperatorCoordinator. We would like to invite
> > Becket
> > > > Qin
> > > > > > who
> > > > > > > is
> > > > > > > > >> the
> > > > > > > > >> > > > author
> > > > > > > > >> > > > > > of OperatorCoordinator to help us to answer this
> > > > > concern.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Best,
> > > > > > > > >> > > > > > Nicholas Jiang
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > On 2021/12/20 10:07:14 Martijn Visser wrote:
> > > > > > > > >> > > > > > > Hi all,
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Really like the discussion on this topic
> moving
> > > > > > forward. I
> > > > > > > > >> really
> > > > > > > > >> > > > think
> > > > > > > > >> > > > > > > this feature will be much appreciated by the
> > Flink
> > > > > > users.
> > > > > > > > >> What I
> > > > > > > > >> > > > still
> > > > > > > > >> > > > > > have
> > > > > > > > >> > > > > > > left to answer/reply to:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > -- Good point. If for whatever reason the
> > > different
> > > > > > > > >> taskmanagers
> > > > > > > > >> > > > can't
> > > > > > > > >> > > > > > get
> > > > > > > > >> > > > > > > the latest rule, the Operator Coordinator
> could
> > > > send a
> > > > > > > > >> heartbeat to
> > > > > > > > >> > > > all
> > > > > > > > >> > > > > > > taskmanagers with the latest rules and check
> the
> > > > > > heartbeat
> > > > > > > > >> response
> > > > > > > > >> > > > > from
> > > > > > > > >> > > > > > > all the taskmanagers whether the latest rules
> of
> > > the
> > > > > > > > >> taskmanager is
> > > > > > > > >> > > > > equal
> > > > > > > > >> > > > > > > to these of the Operator Coordinator.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Just to be sure, this indeed would mean that
> if
> > > for
> > > > > > > whatever
> > > > > > > > >> reason
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > > heartbeat timeout, it would crash the job,
> > right?
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > -- We have consided about the solution
> mentioned
> > > > > above.
> > > > > > In
> > > > > > > > >> this
> > > > > > > > >> > > > > > solution, I
> > > > > > > > >> > > > > > > have some questions about how to guarantee the
> > > > > > consistency
> > > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> > > > rule
> > > > > > > > >> > > > > > > between each TaskManager. By having a
> coodinator
> > > in
> > > > > the
> > > > > > > > >> JobManager
> > > > > > > > >> > > to
> > > > > > > > >> > > > > > > centrally manage the latest rules, the latest
> > > rules
> > > > of
> > > > > > all
> > > > > > > > >> > > > TaskManagers
> > > > > > > > >> > > > > > are
> > > > > > > > >> > > > > > > consistent with those of the JobManager, so as
> > to
> > > > > avoid
> > > > > > > the
> > > > > > > > >> > > > > > inconsistencies
> > > > > > > > >> > > > > > > that may be encountered in the above solution.
> > Can
> > > > you
> > > > > > > > >> introduce
> > > > > > > > >> > > how
> > > > > > > > >> > > > > this
> > > > > > > > >> > > > > > > solution guarantees the consistency of the
> > rules?
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > The consistency that we could guarantee was
> > based
> > > on
> > > > > how
> > > > > > > > >> often each
> > > > > > > > >> > > > > > > TaskManager would do a refresh and how often
> we
> > > > would
> > > > > > > > accept a
> > > > > > > > >> > > > refresh
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > fail. We set the refresh time to a relatively
> > > short
> > > > > one
> > > > > > > (30
> > > > > > > > >> > > seconds)
> > > > > > > > >> > > > > and
> > > > > > > > >> > > > > > > maximum failures to 3. That meant that we
> could
> > > > > > guarantee
> > > > > > > > that
> > > > > > > > >> > > rules
> > > > > > > > >> > > > > > would
> > > > > > > > >> > > > > > > be updated in < 2 minutes or else the job
> would
> > > > crash.
> > > > > > > That
> > > > > > > > >> was
> > > > > > > > >> > > > > > sufficient
> > > > > > > > >> > > > > > > for our use cases. This also really depends on
> > how
> > > > big
> > > > > > > your
> > > > > > > > >> cluster
> > > > > > > > >> > > > > is. I
> > > > > > > > >> > > > > > > can imagine that if you have a large scale
> > cluster
> > > > > that
> > > > > > > you
> > > > > > > > >> want to
> > > > > > > > >> > > > > run,
> > > > > > > > >> > > > > > > you don't want to DDOS the backend system
> where
> > > you
> > > > > have
> > > > > > > > your
> > > > > > > > >> rules
> > > > > > > > >> > > > > > stored.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > -- In summary, the current design is that
> > > JobManager
> > > > > > tells
> > > > > > > > all
> > > > > > > > >> > > > > > TaskManagers
> > > > > > > > >> > > > > > > the latest rules through OperatorCoodinator,
> and
> > > > will
> > > > > > > > >> initiate a
> > > > > > > > >> > > > > > heartbeat
> > > > > > > > >> > > > > > > to check whether the latest rules on each
> > > > TaskManager
> > > > > > are
> > > > > > > > >> > > consistent.
> > > > > > > > >> > > > > We
> > > > > > > > >> > > > > > > will describe how to deal with the Failover
> > > scenario
> > > > > in
> > > > > > > more
> > > > > > > > >> detail
> > > > > > > > >> > > > on
> > > > > > > > >> > > > > > FLIP.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Thanks for that. I think having the JobManager
> > > tell
> > > > > the
> > > > > > > > >> > > TaskManagers
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > applicable rules would indeed end up being the
> > > best
> > > > > > > design.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > -- about the concerns around consistency
> raised
> > by
> > > > > > > Martijn:
> > > > > > > > I
> > > > > > > > >> > > think a
> > > > > > > > >> > > > > lot
> > > > > > > > >> > > > > > > of those can be mitigated by using an event
> time
> > > > > > timestamp
> > > > > > > > >> from
> > > > > > > > >> > > which
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > rules take effect. The reprocessing scenario,
> > for
> > > > > > example,
> > > > > > > > is
> > > > > > > > >> > > covered
> > > > > > > > >> > > > > by
> > > > > > > > >> > > > > > > this. If a pattern processor should become
> > active
> > > as
> > > > > > soon
> > > > > > > as
> > > > > > > > >> > > > possible,
> > > > > > > > >> > > > > > > there will still be inconsistencies between
> > > > > > Taskmanagers,
> > > > > > > > but
> > > > > > > > >> "as
> > > > > > > > >> > > > soon
> > > > > > > > >> > > > > as
> > > > > > > > >> > > > > > > possible" is vague anyway, which is why I
> think
> > > > that's
> > > > > > ok.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > I think an event timestamp is indeed a really
> > > > > important
> > > > > > > one.
> > > > > > > > >> We
> > > > > > > > >> > > also
> > > > > > > > >> > > > > used
> > > > > > > > >> > > > > > > that in my previous role, with the
> > > > > > ruleActivationTimestamp
> > > > > > > > >> compared
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > > eventtime (well, actually we used Kafka
> > logAppend
> > > > time
> > > > > > > > because
> > > > > > > > >> > > > > > > eventtime wasn't always properly set so we
> used
> > > that
> > > > > > time
> > > > > > > to
> > > > > > > > >> > > > overwrite
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > eventtime from the event itself).
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Best regards,
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Martijn
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > On Mon, 20 Dec 2021 at 09:08, Konstantin
> Knauf <
> > > > > > > > >> kna...@apache.org>
> > > > > > > > >> > > > > > wrote:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > > Hi Nicholas, Hi Junfeng,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > about the concerns around consistency raised
> > by
> > > > > > > Martijn: I
> > > > > > > > >> think
> > > > > > > > >> > > a
> > > > > > > > >> > > > > lot
> > > > > > > > >> > > > > > of
> > > > > > > > >> > > > > > > > those can be mitigated by using an event
> time
> > > > > > timestamp
> > > > > > > > from
> > > > > > > > >> > > which
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > > rules take effect. The reprocessing
> scenario,
> > > for
> > > > > > > example,
> > > > > > > > >> is
> > > > > > > > >> > > > covered
> > > > > > > > >> > > > > > by
> > > > > > > > >> > > > > > > > this. If a pattern processor should become
> > > active
> > > > as
> > > > > > > soon
> > > > > > > > as
> > > > > > > > >> > > > > possible,
> > > > > > > > >> > > > > > > > there will still be inconsistencies between
> > > > > > > Taskmanagers,
> > > > > > > > >> but "as
> > > > > > > > >> > > > > soon
> > > > > > > > >> > > > > > as
> > > > > > > > >> > > > > > > > possible" is vague anyway, which is why I
> > think
> > > > > that's
> > > > > > > ok.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > about naming: The naming with
> > "PatternProcessor"
> > > > > > sounds
> > > > > > > > >> good to
> > > > > > > > >> > > me.
> > > > > > > > >> > > > > > Final
> > > > > > > > >> > > > > > > > nit: I would go for CEP#patternProccessors,
> > > which
> > > > > > would
> > > > > > > be
> > > > > > > > >> > > > consistent
> > > > > > > > >> > > > > > with
> > > > > > > > >> > > > > > > > CEP#pattern.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > I am not sure about one of the rejected
> > > > > alternatives:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > > Have each subtask of an operator make the
> > > update
> > > > > on
> > > > > > > > their
> > > > > > > > >> own
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >    -
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >    It is hard to achieve consistency.
> > > > > > > > >> > > > > > > >    -
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >       Though the time interval that each
> > subtask
> > > > > makes
> > > > > > > the
> > > > > > > > >> update
> > > > > > > > >> > > > can
> > > > > > > > >> > > > > > be
> > > > > > > > >> > > > > > > >       the same, the absolute time they make
> > the
> > > > > update
> > > > > > > > >> might be
> > > > > > > > >> > > > > > different.
> > > > > > > > >> > > > > > > > For
> > > > > > > > >> > > > > > > >       example, one makes updates at 10:00,
> > > 10:05,
> > > > > etc,
> > > > > > > > while
> > > > > > > > >> > > > another
> > > > > > > > >> > > > > > does
> > > > > > > > >> > > > > > > > it at
> > > > > > > > >> > > > > > > >       10:01, 10:06. In this case the
> subtasks
> > > > might
> > > > > > > never
> > > > > > > > >> > > > processing
> > > > > > > > >> > > > > > data
> > > > > > > > >> > > > > > > > with
> > > > > > > > >> > > > > > > >       the same set of pattern processors.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > I would have thought that it is quite easy
> to
> > > poll
> > > > > for
> > > > > > > the
> > > > > > > > >> rules
> > > > > > > > >> > > > from
> > > > > > > > >> > > > > > each
> > > > > > > > >> > > > > > > > Subtask at *about *the same time. So, this
> > alone
> > > > > does
> > > > > > > not
> > > > > > > > >> seem to
> > > > > > > > >> > > > be
> > > > > > > > >> > > > > > > > enough to rule out this option. I've looped
> in
> > > > David
> > > > > > > > >> Moravek to
> > > > > > > > >> > > get
> > > > > > > > >> > > > > his
> > > > > > > > >> > > > > > > > opinion of the additional load imposed on
> the
> > > JM.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Konstantin
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas
> > Jiang <
> > > > > > > > >> > > > > > nicholasji...@apache.org>
> > > > > > > > >> > > > > > > > wrote:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > > Hi Yue,
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Thanks for your feedback of the FLIP. I
> have
> > > > > > addressed
> > > > > > > > >> your
> > > > > > > > >> > > > > > questions and
> > > > > > > > >> > > > > > > > > made a corresponding explanation as
> follows:
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > -- About Pattern Updating. If we use
> > > > > > > > >> > > PatternProcessoerDiscoverer
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > > > update
> > > > > > > > >> > > > > > > > > the rules, will it increase the load of
> JM?
> > > For
> > > > > > > example,
> > > > > > > > >> if the
> > > > > > > > >> > > > > user
> > > > > > > > >> > > > > > > > wants
> > > > > > > > >> > > > > > > > > the updated rule to take effect
> > immediately,,
> > > > > which
> > > > > > > > means
> > > > > > > > >> that
> > > > > > > > >> > > we
> > > > > > > > >> > > > > > need to
> > > > > > > > >> > > > > > > > > set a shorter check interval or there is
> > > another
> > > > > > > > scenario
> > > > > > > > >> when
> > > > > > > > >> > > > > users
> > > > > > > > >> > > > > > > > rarely
> > > > > > > > >> > > > > > > > > update the pattern, will the
> > > > > > > PatternProcessoerDiscoverer
> > > > > > > > >> be in
> > > > > > > > >> > > > most
> > > > > > > > >> > > > > > of
> > > > > > > > >> > > > > > > > the
> > > > > > > > >> > > > > > > > > time Do useless checks ? Will a lazy
> update
> > > mode
> > > > > > could
> > > > > > > > be
> > > > > > > > >> used,
> > > > > > > > >> > > > > > which the
> > > > > > > > >> > > > > > > > > pattern only be updated when triggered by
> > the
> > > > > user,
> > > > > > > and
> > > > > > > > do
> > > > > > > > >> > > > nothing
> > > > > > > > >> > > > > at
> > > > > > > > >> > > > > > > > other
> > > > > > > > >> > > > > > > > > times?
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > PatternProcessoerDiscoverer is a
> > user-defined
> > > > > > > interface
> > > > > > > > to
> > > > > > > > >> > > > discover
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > > > PatternProcessor updates. Periodically
> > > checking
> > > > > the
> > > > > > > > >> > > > > PatternProcessor
> > > > > > > > >> > > > > > in
> > > > > > > > >> > > > > > > > the
> > > > > > > > >> > > > > > > > > database is a implementation of the
> > > > > > > > >> PatternProcessoerDiscoverer
> > > > > > > > >> > > > > > > > interface,
> > > > > > > > >> > > > > > > > > which is that periodically querys all the
> > > > > > > > PatternProcessor
> > > > > > > > >> > > table
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > > > certain
> > > > > > > > >> > > > > > > > > interval. This implementation indeeds has
> > the
> > > > > > useless
> > > > > > > > >> checks,
> > > > > > > > >> > > and
> > > > > > > > >> > > > > > could
> > > > > > > > >> > > > > > > > > directly integrates the changelog of the
> > > table.
> > > > In
> > > > > > > > >> addition, in
> > > > > > > > >> > > > > > addition
> > > > > > > > >> > > > > > > > to
> > > > > > > > >> > > > > > > > > the implementation of periodically
> checking
> > > the
> > > > > > > > database,
> > > > > > > > >> there
> > > > > > > > >> > > > are
> > > > > > > > >> > > > > > other
> > > > > > > > >> > > > > > > > > implementations such as the
> PatternProcessor
> > > > that
> > > > > > > > provides
> > > > > > > > >> > > > Restful
> > > > > > > > >> > > > > > > > services
> > > > > > > > >> > > > > > > > > to receive updates.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > --  I still have some confusion about how
> > Key
> > > > > > > Generating
> > > > > > > > >> > > > Opertator
> > > > > > > > >> > > > > > and
> > > > > > > > >> > > > > > > > > CepOperator (Pattern Matching & Processing
> > > > > Operator)
> > > > > > > > work
> > > > > > > > >> > > > together.
> > > > > > > > >> > > > > > If
> > > > > > > > >> > > > > > > > > there are N PatternProcessors, will the
> Key
> > > > > > Generating
> > > > > > > > >> > > Opertator
> > > > > > > > >> > > > > > > > generate N
> > > > > > > > >> > > > > > > > > keyedStreams, and then N CepOperator would
> > > > process
> > > > > > > each
> > > > > > > > >> Key
> > > > > > > > >> > > > > > separately ?
> > > > > > > > >> > > > > > > > Or
> > > > > > > > >> > > > > > > > > every CepOperator Task would process all
> > > > patterns,
> > > > > > if
> > > > > > > > so,
> > > > > > > > >> does
> > > > > > > > >> > > > the
> > > > > > > > >> > > > > > key
> > > > > > > > >> > > > > > > > type
> > > > > > > > >> > > > > > > > > in each PatternProcessor need to be the
> > same?
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Firstly the Pattern Matching & Processing
> > > > Operator
> > > > > > is
> > > > > > > > not
> > > > > > > > >> the
> > > > > > > > >> > > > > > CepOperator
> > > > > > > > >> > > > > > > > > at present, because CepOperator mechanism
> is
> > > > based
> > > > > > on
> > > > > > > > the
> > > > > > > > >> > > > NFAState.
> > > > > > > > >> > > > > > > > > Secondly if there are N PatternProcessors,
> > the
> > > > Key
> > > > > > > > >> Generating
> > > > > > > > >> > > > > > Opertator
> > > > > > > > >> > > > > > > > > combines all the keyedStreams with keyBy()
> > > > > > operation,
> > > > > > > > >> thus the
> > > > > > > > >> > > > > > Pattern
> > > > > > > > >> > > > > > > > > Matching & Processing Operator would
> process
> > > all
> > > > > the
> > > > > > > > >> patterns.
> > > > > > > > >> > > In
> > > > > > > > >> > > > > > other
> > > > > > > > >> > > > > > > > > words, the KeySelector of the
> > PatternProcessor
> > > > is
> > > > > > used
> > > > > > > > >> for the
> > > > > > > > >> > > > Key
> > > > > > > > >> > > > > > > > > Generating Opertator, and the Pattern and
> > > > > > > > >> > > PatternProceessFunction
> > > > > > > > >> > > > > of
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > > > PatternProcessor are used for the Pattern
> > > > > Matching &
> > > > > > > > >> Processing
> > > > > > > > >> > > > > > Operator.
> > > > > > > > >> > > > > > > > > Lastly the key type in each
> PatternProcessor
> > > is
> > > > > the
> > > > > > > > same,
> > > > > > > > >> > > > regarded
> > > > > > > > >> > > > > as
> > > > > > > > >> > > > > > > > > Object type.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > -- Maybe need to pay attention to it when
> > > > > > implementing
> > > > > > > > it
> > > > > > > > >> .If
> > > > > > > > >> > > > some
> > > > > > > > >> > > > > > > > Pattern
> > > > > > > > >> > > > > > > > > has been removed or updated, will the
> > > partially
> > > > > > > matched
> > > > > > > > >> results
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > > > > StateBackend would be clean up or We rely
> on
> > > > state
> > > > > > ttl
> > > > > > > > to
> > > > > > > > >> clean
> > > > > > > > >> > > > up
> > > > > > > > >> > > > > > these
> > > > > > > > >> > > > > > > > > expired states.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > If certain Pattern has been removed or
> > > updated,
> > > > > the
> > > > > > > > >> partially
> > > > > > > > >> > > > > matched
> > > > > > > > >> > > > > > > > > results in StateBackend would be clean up
> > > until
> > > > > the
> > > > > > > next
> > > > > > > > >> > > > > checkpoint.
> > > > > > > > >> > > > > > The
> > > > > > > > >> > > > > > > > > partially matched result doesn't depend on
> > the
> > > > > state
> > > > > > > ttl
> > > > > > > > >> of the
> > > > > > > > >> > > > > > > > > StateBackend.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > 4. Will the PatternProcessorManager keep
> all
> > > the
> > > > > > > active
> > > > > > > > >> > > > > > PatternProcessor
> > > > > > > > >> > > > > > > > > in memory? We have also Support Multiple
> > Rule
> > > > and
> > > > > > > > Dynamic
> > > > > > > > >> Rule
> > > > > > > > >> > > > > > Changing.
> > > > > > > > >> > > > > > > > > But we are facing such a problem, some
> > users’
> > > > > usage
> > > > > > > > >> scenarios
> > > > > > > > >> > > are
> > > > > > > > >> > > > > > that
> > > > > > > > >> > > > > > > > they
> > > > > > > > >> > > > > > > > > want to have their own pattern for each
> > > user_id,
> > > > > > which
> > > > > > > > >> means
> > > > > > > > >> > > that
> > > > > > > > >> > > > > > there
> > > > > > > > >> > > > > > > > > could be thousands of patterns, which
> would
> > > make
> > > > > the
> > > > > > > > >> > > performance
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > > > > Pattern
> > > > > > > > >> > > > > > > > > Matching very poor. We are also trying to
> > > solve
> > > > > this
> > > > > > > > >> problem.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > The PatternProcessorManager keeps all the
> > > active
> > > > > > > > >> > > PatternProcessor
> > > > > > > > >> > > > > in
> > > > > > > > >> > > > > > > > > memory. For scenarios that they want to
> have
> > > > their
> > > > > > own
> > > > > > > > >> pattern
> > > > > > > > >> > > > for
> > > > > > > > >> > > > > > each
> > > > > > > > >> > > > > > > > > user_id, IMO, is it possible to reduce the
> > > > > > > fine-grained
> > > > > > > > >> pattern
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > > > > > PatternProcessor to solve the performance
> > > > problem
> > > > > of
> > > > > > > the
> > > > > > > > >> > > Pattern
> > > > > > > > >> > > > > > > > Matching,
> > > > > > > > >> > > > > > > > > for example, a pattern corresponds to a
> > group
> > > of
> > > > > > > users?
> > > > > > > > >> The
> > > > > > > > >> > > > > scenarios
> > > > > > > > >> > > > > > > > > mentioned above need to be solved by case
> by
> > > > case.
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > Best,
> > > > > > > > >> > > > > > > > > Nicholas Jiang
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > > > On 2021/12/17 11:43:10 yue ma wrote:
> > > > > > > > >> > > > > > > > > > Glad to see the Community's progress in
> > > Flink
> > > > > CEP.
> > > > > > > > After
> > > > > > > > >> > > > reading
> > > > > > > > >> > > > > > this
> > > > > > > > >> > > > > > > > > Flip,
> > > > > > > > >> > > > > > > > > > I have few questions, would you please
> > take
> > > a
> > > > > look
> > > > > > > ?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 1. About Pattern Updating. If we use
> > > > > > > > >> > > > PatternProcessoerDiscoverer
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > > > update
> > > > > > > > >> > > > > > > > > > the rules, will it increase the load of
> > JM?
> > > > For
> > > > > > > > >> example, if
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > user
> > > > > > > > >> > > > > > > > > wants
> > > > > > > > >> > > > > > > > > > the updated rule to take effect
> > > immediately,,
> > > > > > which
> > > > > > > > >> means
> > > > > > > > >> > > that
> > > > > > > > >> > > > we
> > > > > > > > >> > > > > > need
> > > > > > > > >> > > > > > > > to
> > > > > > > > >> > > > > > > > > > set a shorter check interval  or there
> is
> > > > > another
> > > > > > > > >> scenario
> > > > > > > > >> > > when
> > > > > > > > >> > > > > > users
> > > > > > > > >> > > > > > > > > > rarely update the pattern, will the
> > > > > > > > >> > > PatternProcessoerDiscoverer
> > > > > > > > >> > > > > be
> > > > > > > > >> > > > > > in
> > > > > > > > >> > > > > > > > > most
> > > > > > > > >> > > > > > > > > > of the time Do useless checks ? Will a
> > lazy
> > > > > update
> > > > > > > > mode
> > > > > > > > >> could
> > > > > > > > >> > > > be
> > > > > > > > >> > > > > > used,
> > > > > > > > >> > > > > > > > > > which the pattern only be updated when
> > > > triggered
> > > > > > by
> > > > > > > > the
> > > > > > > > >> user,
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > do
> > > > > > > > >> > > > > > > > > > nothing at other times ?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 2.   I still have some confusion about
> how
> > > Key
> > > > > > > > >> Generating
> > > > > > > > >> > > > > > Opertator and
> > > > > > > > >> > > > > > > > > > CepOperator (Pattern Matching &
> Processing
> > > > > > Operator)
> > > > > > > > >> work
> > > > > > > > >> > > > > > together. If
> > > > > > > > >> > > > > > > > > > there are N PatternProcessors, will the
> > Key
> > > > > > > Generating
> > > > > > > > >> > > > Opertator
> > > > > > > > >> > > > > > > > > generate N
> > > > > > > > >> > > > > > > > > > keyedStreams, and then N CepOperator
> would
> > > > > process
> > > > > > > > each
> > > > > > > > >> Key
> > > > > > > > >> > > > > > separately
> > > > > > > > >> > > > > > > > ?
> > > > > > > > >> > > > > > > > > Or
> > > > > > > > >> > > > > > > > > > every CepOperator Task would process all
> > > > > patterns,
> > > > > > > if
> > > > > > > > >> so,
> > > > > > > > >> > > does
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > key
> > > > > > > > >> > > > > > > > > type
> > > > > > > > >> > > > > > > > > > in each PatternProcessor need to be the
> > > same ?
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 3. Maybe need to pay attention to it
> when
> > > > > > > implementing
> > > > > > > > >> it .If
> > > > > > > > >> > > > > some
> > > > > > > > >> > > > > > > > > Pattern
> > > > > > > > >> > > > > > > > > > has been removed or updateed  ,will the
> > > > > partially
> > > > > > > > >> matched
> > > > > > > > >> > > > results
> > > > > > > > >> > > > > > in
> > > > > > > > >> > > > > > > > > > StateBackend would be clean up or We
> rely
> > on
> > > > > state
> > > > > > > ttl
> > > > > > > > >> to
> > > > > > > > >> > > clean
> > > > > > > > >> > > > > up
> > > > > > > > >> > > > > > > > these
> > > > > > > > >> > > > > > > > > > expired states.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > 4. Will the PatternProcessorManager keep
> > all
> > > > the
> > > > > > > > active
> > > > > > > > >> > > > > > > > PatternProcessor
> > > > > > > > >> > > > > > > > > in
> > > > > > > > >> > > > > > > > > > memory ? We have also Support Multiple
> > Rule
> > > > and
> > > > > > > > Dynamic
> > > > > > > > >> Rule
> > > > > > > > >> > > > > > Changing .
> > > > > > > > >> > > > > > > > > > But we are facing such a problem, some
> > > users’
> > > > > > usage
> > > > > > > > >> scenarios
> > > > > > > > >> > > > are
> > > > > > > > >> > > > > > that
> > > > > > > > >> > > > > > > > > they
> > > > > > > > >> > > > > > > > > > want to have their own pattern for each
> > > > user_id,
> > > > > > > which
> > > > > > > > >> means
> > > > > > > > >> > > > that
> > > > > > > > >> > > > > > there
> > > > > > > > >> > > > > > > > > > could be thousands of patterns, which
> > would
> > > > make
> > > > > > the
> > > > > > > > >> > > > performance
> > > > > > > > >> > > > > of
> > > > > > > > >> > > > > > > > > Pattern
> > > > > > > > >> > > > > > > > > > Matching very poor. We are also trying
> to
> > > > solve
> > > > > > this
> > > > > > > > >> problem.
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > Yunfeng Zhou <
> flink.zhouyunf...@gmail.com
> > >
> > > > > > > > >> 于2021年12月10日周五
> > > > > > > > >> > > > > 19:16写道:
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > Hi all,
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > I'm opening this thread to propose the
> > > > design
> > > > > to
> > > > > > > > >> support
> > > > > > > > >> > > > > multiple
> > > > > > > > >> > > > > > > > rule
> > > > > > > > >> > > > > > > > > &
> > > > > > > > >> > > > > > > > > > > dynamic rule changing in the Flink-CEP
> > > > > project,
> > > > > > as
> > > > > > > > >> > > described
> > > > > > > > >> > > > in
> > > > > > > > >> > > > > > > > > FLIP-200
> > > > > > > > >> > > > > > > > > > > [1]
> > > > > > > > >> > > > > > > > > > > .
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > Currently Flink CEP only supports
> > having a
> > > > > > single
> > > > > > > > >> pattern
> > > > > > > > >> > > > > inside
> > > > > > > > >> > > > > > a
> > > > > > > > >> > > > > > > > > > > CepOperator and does not support
> > changing
> > > > the
> > > > > > > > pattern
> > > > > > > > >> > > > > > dynamically. In
> > > > > > > > >> > > > > > > > > order
> > > > > > > > >> > > > > > > > > > > to reduce resource consumption and to
> > > > > experience
> > > > > > > > >> shorter
> > > > > > > > >> > > > > downtime
> > > > > > > > >> > > > > > > > > during
> > > > > > > > >> > > > > > > > > > > pattern updates, there is a growing
> need
> > > in
> > > > > the
> > > > > > > > >> production
> > > > > > > > >> > > > > > > > environment
> > > > > > > > >> > > > > > > > > that
> > > > > > > > >> > > > > > > > > > > expects CEP to support having multiple
> > > > > patterns
> > > > > > in
> > > > > > > > one
> > > > > > > > >> > > > operator
> > > > > > > > >> > > > > > and
> > > > > > > > >> > > > > > > > to
> > > > > > > > >> > > > > > > > > > > support dynamically changing them.
> > > > Therefore I
> > > > > > > > >> propose to
> > > > > > > > >> > > add
> > > > > > > > >> > > > > > certain
> > > > > > > > >> > > > > > > > > > > infrastructure as described in
> FLIP-200
> > to
> > > > > > support
> > > > > > > > >> these
> > > > > > > > >> > > > > > > > > functionalities.
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > Please feel free to reply to this
> email
> > > > > thread.
> > > > > > > > >> Looking
> > > > > > > > >> > > > forward
> > > > > > > > >> > > > > > to
> > > > > > > > >> > > > > > > > your
> > > > > > > > >> > > > > > > > > > > feedback!
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > [1]
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > Best regards,
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > > > Yunfeng
> > > > > > > > >> > > > > > > > > > >
> > > > > > > > >> > > > > > > > > >
> > > > > > > > >> > > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > --
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Konstantin Knauf
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > https://twitter.com/snntrable
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > https://github.com/knaufk
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > --
> > > > > > > > >> > >
> > > > > > > > >> > > Konstantin Knauf
> > > > > > > > >> > >
> > > > > > > > >> > > https://twitter.com/snntrable
> > > > > > > > >> > >
> > > > > > > > >> > > https://github.com/knaufk
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to