tvalentyn commented on code in PR #28781:
URL: https://github.com/apache/beam/pull/28781#discussion_r1349297511
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
-def _get_state_cache_size(experiments):
- """Defines the upper number of state items to cache.
-
- Note: state_cache_size is an experimental flag and might not be available in
- future releases.
+def _get_state_cache_size(options, experiments):
+ """Defines the maximum size of the cache in megabytes.
Returns:
an int indicating the maximum number of megabytes to cache.
Default is 0 MB
"""
+ state_cache_size = options.view_as(WorkerOptions).state_cache_size
- for experiment in experiments:
- # There should only be 1 match so returning from the loop
- if re.match(r'state_cache_size=', experiment):
- return int(
- re.match(r'state_cache_size=(?P<state_cache_size>.*)',
- experiment).group('state_cache_size')) << 20
- return 0
+ if not state_cache_size:
+ # to maintain backward compatibility
+ for experiment in experiments:
+ # There should only be 1 match so returning from the loop
+ if re.match(r'state_cache_size=', experiment):
+ return int(
+ re.match(r'state_cache_size=(?P<state_cache_size>.*)',
+ experiment).group('state_cache_size')) << 20
+
+ state_cache_size = 100
Review Comment:
to bake this change in dev SDKs for longer period i'd be open to have a PR
that changes 1 line to return 100MB here + *file an issue blocking the next
release to flesh out the rest of the logic.
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
-def _get_state_cache_size(experiments):
- """Defines the upper number of state items to cache.
-
- Note: state_cache_size is an experimental flag and might not be available in
- future releases.
+def _get_state_cache_size(options, experiments):
+ """Defines the maximum size of the cache in megabytes.
Returns:
an int indicating the maximum number of megabytes to cache.
Default is 0 MB
"""
+ state_cache_size = options.view_as(WorkerOptions).state_cache_size
- for experiment in experiments:
- # There should only be 1 match so returning from the loop
- if re.match(r'state_cache_size=', experiment):
- return int(
- re.match(r'state_cache_size=(?P<state_cache_size>.*)',
- experiment).group('state_cache_size')) << 20
- return 0
+ if not state_cache_size:
+ # to maintain backward compatibility
+ for experiment in experiments:
+ # There should only be 1 match so returning from the loop
+ if re.match(r'state_cache_size=', experiment):
+ return int(
+ re.match(r'state_cache_size=(?P<state_cache_size>.*)',
+ experiment).group('state_cache_size')) << 20
+
+ state_cache_size = 100
Review Comment:
to bake this change in dev SDKs for longer period i'd be open to have a PR
that changes 1 line to return 100MB here + file an issue blocking the next
release to flesh out the rest of the logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]