[
https://issues.apache.org/jira/browse/BEAM-13734?focusedWorklogId=715917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-715917
]
ASF GitHub Bot logged work on BEAM-13734:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jan/22 19:06
Start Date: 26/Jan/22 19:06
Worklog Time Spent: 10m
Work Description: KevinGG commented on a change in pull request #16601:
URL: https://github.com/apache/beam/pull/16601#discussion_r792924698
##########
File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
##########
@@ -232,12 +232,12 @@ def has_source_to_cache(user_pipeline):
file_based_cm = ie.current_env().get_cache_manager(user_pipeline)
cache_dir = file_based_cm._cache_dir
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = ib.options.cache_root
+ if ie.current_env().options.cache_root:
Review comment:
This seems to be verbose. How about
```
cache_root = ie.current_env().options.cache_root
if cache_root:
...
if cache_root.startswith('gs://'):
raise ...
cache_dir = cache_root
```
Also nit: use single quotes whenever you can to conform to the existing code
style.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline,
create_if_absent=False):
manager for the pipeline."""
cache_manager = self._cache_managers.get(str(id(pipeline)), None)
if not cache_manager and create_if_absent:
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+ cache_root = self.options.cache_root
+ if cache_root:
+ if cache_root.startswith("gs://"):
+ cache_root_path = PurePath(cache_root)
+ bucket_name = cache_root_path.parts[1]
+ self.check_bucket_exists(bucket_name)
+ cache_dir = "gs://"+"/".join(cache_root_path.parts[1:]) \
+ + "/" + str(id(pipeline))
Review comment:
You can format the string by:
```
# See how the function call span over multiple lines, please use yapf to
format it though.
'gs://{}/{}'.format(
'/'.join(cache_root_path.parts[1:]),
id(pipeline))
```
to avoid the line break and increase readability.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -236,6 +236,9 @@ def cache_root(self, value):
Example of local directory usage::
interactive_beam.options.cache_root = "/Users/username/my/cache/dir"
+
+ Example of GCS directory usage::
Review comment:
nit: let's use single quotes here too.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
pipeline)
chain.user_pipeline = pipeline
return chain
+
+ def check_bucket_exists(self, bucket_name):
+ try:
+ from apitools.base.py.exceptions import HttpError
+ except:
+ raise ImportError('Could not import HttpError from apitools.')
Review comment:
+1, if moving to the top, please follow the example:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L57
Or, see if the ImportError can be merged into the `unable to verify whether
bucket {} exists` warning branch because you might not want the import error to
fail the whole module.
I also think this function could be moved to the utils.py module:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/utils.py
Please add a docstring and typehints to this function so that we know what
it does.
Things I'm not sure are:
- if the function is named `check_...`, shouldn't it return True/False like
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsio.py#L461?
But how will you differentiate "not exist" from "not able to check" by a bool
value?
- if you rename the function to `assert_...` so that the function only
asserts whether a bucket exists or not without a returning value, you may also
need to add a `raise`:
```
except HttpError as e:
if e.status_code == 404:
_LOGGER.error('%s bucket does not exist!', bucket_name)
raise
else:
```
Otherwise, the error branch won't take effect.
Please also add a unit test to this function for different scenarios.
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -675,3 +683,25 @@ def get_sql_chain(self, pipeline, set_user_pipeline=False):
pipeline)
chain.user_pipeline = pipeline
return chain
+
+ def check_bucket_exists(self, bucket_name):
+ try:
+ from apitools.base.py.exceptions import HttpError
+ except:
+ raise ImportError('Could not import HttpError from apitools.')
+
+ try:
+ storage_client = storage.StorageV1(
+ credentials=auth.get_service_credentials(),
+ get_credentials=False,
+ http=get_new_http(),
+ response_encoding='utf8')
+ request = storage.StorageBucketsGetRequest(bucket=bucket_name)
+ storage_client.buckets.Get(request)
+ except HttpError as e:
+ if e.status_code == 404:
+ _LOGGER.error("{} bucket does not exist!".format(bucket_name))
+ else:
+ _LOGGER.warning(
+ "HttpError - unable to verify whether bucket {} exists.\
+ ".format(bucket_name))
Review comment:
Please do not pre-format messages in logging statements. This affects
the performance as the formatting can be ignored by the logger configuration
while pre-formatting is always executed and may waste resources.
Instead, pass them as arguments:
```
_LOGGER.level('template %s message', arg1, arg2, arg3, ...)
```
##########
File path:
sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -359,12 +363,16 @@ def get_cache_manager(self, pipeline,
create_if_absent=False):
manager for the pipeline."""
cache_manager = self._cache_managers.get(str(id(pipeline)), None)
if not cache_manager and create_if_absent:
- from apache_beam.runners.interactive import interactive_beam as ib
- if ib.options.cache_root:
- #TODO(victorhc): Handle the case when the path starts with "gs://"
- if ib.options.cache_root.startswith("gs://"):
- raise ValueError("GCS paths are not currently supported.")
- cache_dir = tempfile.mkdtemp(dir=ib.options.cache_root)
+ cache_root = self.options.cache_root
+ if cache_root:
+ if cache_root.startswith("gs://"):
+ cache_root_path = PurePath(cache_root)
+ bucket_name = cache_root_path.parts[1]
Review comment:
Let's also do a len(cache_root_path.parts) < 2 check and raise an error
when the GCS path is invalid (too short) instead of raising an index out of
range exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 715917)
Time Spent: 3h 20m (was: 3h 10m)
> Support cache directories that use GCS buckets
> ----------------------------------------------
>
> Key: BEAM-13734
> URL: https://issues.apache.org/jira/browse/BEAM-13734
> Project: Beam
> Issue Type: New Feature
> Components: runner-py-interactive
> Reporter: Victor Chen
> Assignee: Victor Chen
> Priority: P2
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> * Builds off of the work accomplished under BEAM-13685
> * Modified interactive_environment.py to support caching to a bucket on GCS
> for batch processing pipelines
> * If a specified bucket does not exist, the pipeline will terminate and
> return an error specifying that the bucket does not exist
> * Added cleanup() functionality to cache_manager.py, to enable the
> FileBasedCacheManager class to automatically delete cached values on GCS when
> a bucket path is specified
> * Added docstring to interactive_beam.py with an example of GCS path
> assignment
> * Cached files on GCS will be stored under a directory represented by the
> value of id(pipeline).
> ** Example cached path: gs://my-gcs-bucket/cache/dir/id(pipeline)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)