tvalentyn commented on code in PR #28781:
URL: https://github.com/apache/beam/pull/28781#discussion_r1349295926


##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
   return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
 
 
-def _get_state_cache_size(experiments):
-  """Defines the upper number of state items to cache.
-
-  Note: state_cache_size is an experimental flag and might not be available in
-  future releases.
+def _get_state_cache_size(options, experiments):
+  """Defines the maximum size of the cache in megabytes.

Review Comment:
   there is an inaccuracy between the docstring and returned value. let's add 
units to the function and variable names, such as 
   
   `_get_state_cache_size_bytes`
   
   `state_cache_size_mb`



##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
   return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
 
 
-def _get_state_cache_size(experiments):
-  """Defines the upper number of state items to cache.
-
-  Note: state_cache_size is an experimental flag and might not be available in
-  future releases.
+def _get_state_cache_size(options, experiments):
+  """Defines the maximum size of the cache in megabytes.
 
   Returns:
     an int indicating the maximum number of megabytes to cache.
       Default is 0 MB
   """
+  state_cache_size = options.view_as(WorkerOptions).state_cache_size
 
-  for experiment in experiments:
-    # There should only be 1 match so returning from the loop
-    if re.match(r'state_cache_size=', experiment):
-      return int(
-          re.match(r'state_cache_size=(?P<state_cache_size>.*)',
-                   experiment).group('state_cache_size')) << 20
-  return 0
+  if not state_cache_size:

Review Comment:
   let's cover these branches in unit tests.



##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
   return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
 
 
-def _get_state_cache_size(experiments):
-  """Defines the upper number of state items to cache.
-
-  Note: state_cache_size is an experimental flag and might not be available in
-  future releases.
+def _get_state_cache_size(options, experiments):
+  """Defines the maximum size of the cache in megabytes.
 
   Returns:
     an int indicating the maximum number of megabytes to cache.
       Default is 0 MB
   """
+  state_cache_size = options.view_as(WorkerOptions).state_cache_size
 
-  for experiment in experiments:
-    # There should only be 1 match so returning from the loop
-    if re.match(r'state_cache_size=', experiment):
-      return int(
-          re.match(r'state_cache_size=(?P<state_cache_size>.*)',
-                   experiment).group('state_cache_size')) << 20
-  return 0
+  if not state_cache_size:
+    # to maintain backward compatibility
+    for experiment in experiments:

Review Comment:
   pretty sure there is already a helper that does this parsing.



##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
   return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
 
 
-def _get_state_cache_size(experiments):
-  """Defines the upper number of state items to cache.
-
-  Note: state_cache_size is an experimental flag and might not be available in
-  future releases.
+def _get_state_cache_size(options, experiments):
+  """Defines the maximum size of the cache in megabytes.
 
   Returns:
     an int indicating the maximum number of megabytes to cache.
       Default is 0 MB
   """
+  state_cache_size = options.view_as(WorkerOptions).state_cache_size
 
-  for experiment in experiments:
-    # There should only be 1 match so returning from the loop
-    if re.match(r'state_cache_size=', experiment):
-      return int(
-          re.match(r'state_cache_size=(?P<state_cache_size>.*)',
-                   experiment).group('state_cache_size')) << 20
-  return 0
+  if not state_cache_size:
+    # to maintain backward compatibility
+    for experiment in experiments:
+      # There should only be 1 match so returning from the loop
+      if re.match(r'state_cache_size=', experiment):
+        return int(
+            re.match(r'state_cache_size=(?P<state_cache_size>.*)',
+                     experiment).group('state_cache_size')) << 20
+
+    state_cache_size = 100

Review Comment:
   to bake this change in dev SDKs for longer period i'd be open to have a PR 
that changes 1 line to return 100MB here  + a follow issue before release cut 
to flesh out the rest of the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to