[ 
https://issues.apache.org/jira/browse/BEAM-13734?focusedWorklogId=715917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-715917
 ]

ASF GitHub Bot logged work on BEAM-13734:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jan/22 19:06
            Start Date: 26/Jan/22 19:06
    Worklog Time Spent: 10m 
      Work Description: KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r792924698



##########
File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
##########
@@ -232,12 +232,12 @@ def has_source_to_cache(user_pipeline):
 
       file_based_cm = ie.current_env().get_cache_manager(user_pipeline)
       cache_dir = file_based_cm._cache_dir
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = ib.options.cache_root
+      if ie.current_env().options.cache_root:

Review comment:
       This seems to be verbose. How about
   
   ```
   cache_root = ie.current_env().options.cache_root
   if cache_root:
     ...
     if cache_root.startswith('gs://'):
       raise ...
     cache_dir = cache_root
   ```
   
   Also nit: use single quotes whenever you can to conform to the existing code 
style.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     manager for the pipeline."""
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith("gs://"):
+          cache_root_path = PurePath(cache_root)
+          bucket_name = cache_root_path.parts[1]
+          self.check_bucket_exists(bucket_name)
+          cache_dir = "gs://"+"/".join(cache_root_path.parts[1:]) \
+                    + "/" + str(id(pipeline))

Review comment:
       You can format the string by:
   
   ```
   # See how the function call span over multiple lines, please use yapf to 
format it though.
   'gs://{}/{}'.format(
           '/'.join(cache_root_path.parts[1:]),
           id(pipeline))
   ```
   
   to avoid the line break and increase readability.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -236,6 +236,9 @@ def cache_root(self, value):
 
     Example of local directory usage::
       interactive_beam.options.cache_root = "/Users/username/my/cache/dir"
+
+    Example of GCS directory usage::

Review comment:
       nit: let's use single quotes here too.

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
             pipeline)
       chain.user_pipeline = pipeline
     return chain
+
+  def check_bucket_exists(self, bucket_name):
+    try:
+      from apitools.base.py.exceptions import HttpError
+    except:
+      raise ImportError('Could not import HttpError from apitools.')

Review comment:
       +1, if moving to the top, please follow the example: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L57
   
   Or, see if the ImportError can be merged into the `unable to verify whether 
bucket {} exists` warning branch because you might not want the import error to 
fail the whole module.
   
   I also think this function could be moved to the utils.py module: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/utils.py
   
   Please add a docstring and typehints to this function so that we know what 
it does.
   Things I'm not sure are:
   
   - if the function is named `check_...`, shouldn't it return True/False like 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L461?
 But how will you differentiate "not exist" from "not able to check" by a bool 
value?
   - if you rename the function to `assert_...` so that the function only 
asserts whether a bucket exists or not without a returning value, you may also 
need to add a `raise`:
   
   ```
   except HttpError as e:
         if e.status_code == 404:
           _LOGGER.error('%s bucket does not exist!', bucket_name)
           raise
         else:
   ```
   
   Otherwise, the error branch won't take effect.
   
   Please also add a unit test to this function for different scenarios.
   

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
             pipeline)
       chain.user_pipeline = pipeline
     return chain
+
+  def check_bucket_exists(self, bucket_name):
+    try:
+      from apitools.base.py.exceptions import HttpError
+    except:
+      raise ImportError('Could not import HttpError from apitools.')
+
+    try:
+      storage_client = storage.StorageV1(
+          credentials=auth.get_service_credentials(),
+          get_credentials=False,
+          http=get_new_http(),
+          response_encoding='utf8')
+      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+      storage_client.buckets.Get(request)
+    except HttpError as e:
+      if e.status_code == 404:
+        _LOGGER.error("{} bucket does not exist!".format(bucket_name))
+      else:
+        _LOGGER.warning(
+            "HttpError - unable to verify whether bucket {} exists.\
+        ".format(bucket_name))

Review comment:
       Please do not pre-format messages in logging statements. This affects 
the performance as the formatting can be ignored by the logger configuration 
while pre-formatting is always executed and may waste resources.
   
   Instead, pass them as arguments:
   
   ```
   _LOGGER.level('template %s message', arg1, arg2, arg3, ...)
   ```

##########
File path: 
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline, 
create_if_absent=False):
     manager for the pipeline."""
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
     if not cache_manager and create_if_absent:
-      from apache_beam.runners.interactive import interactive_beam as ib
-      if ib.options.cache_root:
-        #TODO(victorhc): Handle the case when the path starts with "gs://"
-        if ib.options.cache_root.startswith("gs://"):
-          raise ValueError("GCS paths are not currently supported.")
-        cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+      cache_root = self.options.cache_root
+      if cache_root:
+        if cache_root.startswith("gs://"):
+          cache_root_path = PurePath(cache_root)
+          bucket_name = cache_root_path.parts[1]

Review comment:
       Let's also do a len(cache_root_path.parts) < 2 check and raise an error 
when the GCS path is invalid (too short) instead of raising an index out of 
range exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 715917)
    Time Spent: 3h 20m  (was: 3h 10m)

> Support cache directories that use GCS buckets
> ----------------------------------------------
>
>                 Key: BEAM-13734
>                 URL: https://issues.apache.org/jira/browse/BEAM-13734
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Victor Chen
>            Assignee: Victor Chen
>            Priority: P2
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> * Builds off of the work accomplished under BEAM-13685
>  * Modified interactive_environment.py to support caching to a bucket on GCS 
> for batch processing pipelines
>  * If a specified bucket does not exist, the pipeline will terminate and 
> return an error specifying that the bucket does not exist
>  * Added cleanup() functionality to cache_manager.py, to enable the 
> FileBasedCacheManager class to automatically delete cached values on GCS when 
> a bucket path is specified
>  * Added docstring to interactive_beam.py with an example of GCS path 
> assignment
>  * Cached files on GCS will be stored under a directory represented by the 
> value of id(pipeline).
>  ** Example cached path: gs://my-gcs-bucket/cache/dir/id(pipeline)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to