[
https://issues.apache.org/jira/browse/BEAM-9308?focusedWorklogId=386743&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386743
]
ASF GitHub Bot logged work on BEAM-9308:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Feb/20 16:50
Start Date: 13/Feb/20 16:50
Worklog Time Spent: 10m
Work Description: steveniemitz commented on pull request #10852:
[BEAM-9308] Decorrelate state cleanup timers
URL: https://github.com/apache/beam/pull/10852
In our larger streaming pipelines, we generally observe a short blip (1-3
minutes) in event processing, as well as an increase in lag following window
closing. One reason for this is the state cleanup timers all firing once a
window closes.
We've been running this PR in our dev environment for a few days now, and
the results are impressive. By decorrelating (jittering) the state cleanup
timer, we spread the timer load across a short period of time, with the
trade-off of holding state for a slightly longer period of time. In practice
though, I've actually noticed our state cleans up QUICKER with this change,
because the timers don't all compete with each other.
I'd like to contribute this back (and could modify the core StatefulDoFn
runner as well) if we agree this is something useful.
There's a couple points for discussion:
- I chose 3 minutes arbitrarily based on some experimentation, should this
be configurable somehow?
- I use the "user" key (from their KV input) to derive a consistent jitter
amount. The only real reason for this is to prevent the timer from moving
around each element (if we used just a random amount each time instead). I'm
not sure if this actually matters in practice, since timers are supposed to be
cheap to reset?
- I added a counter which has been useful for debugging (and seeing how many
keys are active each window), but could be removed.
Interested to hear thoughts here.
Here's a before and after of our pubsub latency:
before:

after:

Based on the counter I added, we're firing ~20 million timers, across 50
workers = ~400,000 timers / worker. So rather than having them all fire in one
shot, we can spread them over 3 minutes, for only ~2,000 timers / sec, which is
much more reasonable.
cc @lukecwik @pabloem
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [x] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [x] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 386743)
Remaining Estimate: 0h
Time Spent: 10m
> Optimize state cleanup at end-of-window
> ---------------------------------------
>
> Key: BEAM-9308
> URL: https://issues.apache.org/jira/browse/BEAM-9308
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Steve Niemitz
> Assignee: Steve Niemitz
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> When using state with a large keyspace, you can end up with a large amount of
> state cleanup timers set to fire all 1ms after the end of a window. This can
> cause a momentary (I've observed 1-3 minute) lag in processing while windmill
> and the java harness fire and process these cleanup timers.
> By spreading the firing over a short period after the end of the window, we
> can decorrelate the firing of the timers and smooth the load out, resulting
> in much less impact from state cleanup.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)