[
https://issues.apache.org/jira/browse/FLINK-23890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551579#comment-17551579
]
Yue Ma commented on FLINK-23890:
--------------------------------
[~dianfu]
For example, if a task has 100 keys, the QPS is 100 (assuming that each key
will have data coming) and watermark will adavnce 20 times per second.
Before optimization 1), every second will Register 100(records) timers in
`processElement` and 100 * 20 (keys * watermark advance times) in
`onEventTime`.
After optimization 1), only 100(records) timers of `event timestamp` will be
registered every second.
Therefore
Before optimization 1), the frequency of registering and triggering the timer
will be very large, which will consume a lot of CPU and cause job lag.
After optimization 1, although the total number of existing timers may
increase, the frequency of timer registration and triggering will decrease a
lot, so the CPU consumption will be reduced and the operator performance will
be improved
> CepOperator may create a large number of timers and cause performance problems
> ------------------------------------------------------------------------------
>
> Key: FLINK-23890
> URL: https://issues.apache.org/jira/browse/FLINK-23890
> Project: Flink
> Issue Type: Improvement
> Components: Library / CEP
> Affects Versions: 1.12.1
> Reporter: Yue Ma
> Assignee: Nicholas Jiang
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-08-20-13-59-05-977.png,
> image-2022-06-07-21-27-03-814.png, image-2022-06-07-21-40-58-781.png
>
>
> There are two situations in the CepOperator that may register the time when
> dealing with EventTime.
> when the processElement will buffer the data first, and then register a timer
> with a timestamp of watermark+1.
> {code:java}
> if (timestamp > timerService.currentWatermark()) {
> // we have an event with a valid timestamp, so
> // we buffer it until we receive the proper watermark.
> saveRegisterWatermarkTimer();
> bufferEvent(value, timestamp);
> }{code}
> The other is when the EventTimer is triggered, if sortedTimestamps or
> partialMatches are not empty, a timer will also be registered.
> {code:java}
> if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
> saveRegisterWatermarkTimer();
> }{code}
>
> The problem is, if the partialMatches corresponding to each of my keys are
> not empty. Then every time the watermark advances, the timers of all keys
> will be triggered, and then a new EventTimer is re-registered under each key.
> When the number of task keys is very large, this operation greatly affects
> performance.
> !https://code.byted.org/inf/flink/uploads/91aee639553df07fa376cf2865e91fd2/image.png!
> I think it is unnecessary to register EventTimer frequently like this and can
> we make the following changes?
> When an event comes, the timestamp of the EventTimer we registered is equal
> to the EventTime of this event instead of watermark + 1.
> When a new ComputionState with window is created (like *withIn* pattern ),
> we use the timeout of this window to create EventTimer (EventTime +
> WindowTime).
> After making such an attempt in our test environment, the number of
> registered timers has been greatly reduced, and the performance has been
> greatly improved.
> !https://code.byted.org/inf/flink/uploads/24b85492c6a34a35c4445a4fd46c8363/image.png!
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)