[
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=366496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366496
]
ASF GitHub Bot logged work on BEAM-7949:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jan/20 06:41
Start Date: 06/Jan/20 06:41
Worklog Time Spent: 10m
Work Description: sunjincheng121 commented on pull request #10246:
[BEAM-7949] Add time-based cache threshold support in the data service of the
Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r363174479
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
##########
@@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
return 0
+def _get_data_buffer_time_limit_ms(pipeline_options):
+ """Defines the time limt of the outbound data buffering.
+
+ Note: data_buffer_time_limit_ms is an experimental flag and might
+ not be available in future releases.
+
+ Returns:
+ an int indicating the time limit in milliseconds of the the outbound
+ data buffering. Default is 0 (disabled)
+ """
+ experiments = pipeline_options.view_as(DebugOptions).experiments
+ experiments = experiments if experiments else []
+
+ for experiment in experiments:
+ # There should only be 1 match so returning from the loop
+ if re.match(r'data_buffer_time_limit_ms=', experiment):
+ return int(
+ re.match(
+ r'data_buffer_time_limit_ms=(?P<data_buffer_time_limit_ms>.*)',
Review comment:
Make sense to me. I have created a PR
https://github.com/apache/beam/pull/10505 as a follow-up. It changes the config
key "beam_fn_api_data_buffer_time_limit" in the Java SDK harness to
"data_buffer_time_limit_ms". Please note that the config key
"beam_fn_api_data_buffer_size_limit" in the Java SDK harness is also changed to
"data_buffer_size_limit" to have a consistent naming convension.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 366496)
Time Spent: 4h 20m (was: 4h 10m)
> Add time-based cache threshold support in the data service of the Python SDK
> harness
> ------------------------------------------------------------------------------------
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-harness
> Reporter: sunjincheng
> Assignee: sunjincheng
> Priority: Major
> Fix For: 2.19.0
>
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of
> Python SDK harness. It should also support the time-based cache threshold.
> This is very important, especially for streaming jobs which are sensitive to
> the delay.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)