tvalentyn commented on a change in pull request #16448:
URL: https://github.com/apache/beam/pull/16448#discussion_r783453023
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -233,10 +240,17 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
- (
- populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- tf.name, requirements_cache_path)
+ if setup_options.view_as(WorkerOptions).sdk_container_image is None:
Review comment:
You should be passing `tf.name` instead of setup_options.requirements to
preserve current behavior.
Also I am not sure why ` if pypi_requirements:` branch was necessary in
the first place, asked on https://github.com/apache/beam/pull/13607.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
- (
- populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- setup_options.requirements_file, requirements_cache_path)
+ if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+ # downloads the binary distributions
+ (
+ populate_requirements_cache if populate_requirements_cache else
Review comment:
could you please correct the typehint for
`populate_requirements_cache=None, # type: Optional[str]` ? It should be a
callable.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
- (
- populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- setup_options.requirements_file, requirements_cache_path)
+ if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+ # downloads the binary distributions
Review comment:
If user passes a custom image we should advise against passing
`--requirements_file` in a warning.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +221,17 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
- (
- populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- setup_options.requirements_file, requirements_cache_path)
+ if setup_options.view_as(WorkerOptions).sdk_container_image is None:
+ # downloads the binary distributions
+ (
+ populate_requirements_cache if populate_requirements_cache else
+ Stager._populate_requirements_cache_with_whl)(
Review comment:
I would keep one method: _populate_requirements_cache, and add add a
flag such as `prefer_bdists=True/False` (or `fetch_binary` to match other
flags) to avoid code duplication
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -884,3 +946,49 @@ def _download_pypi_sdk_package(
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' %
(expected_files))
+
+ @staticmethod
+ def _download_pypi_package(
+ temp_dir,
+ fetch_binary=False,
+ language_version_tag='27',
+ language_implementation_tag='cp',
+ abi_tag='cp27mu',
+ platform_tag='manylinux2014_x86_64',
+ package_name=None):
+ """Downloads SDK package from PyPI and returns path to local path."""
+ package_name = package_name or Stager.get_sdk_package_name()
+ cmd_args = [
+ Stager._get_python_executable(),
+ '-m',
+ 'pip',
+ 'download',
+ package_name,
+ '--dest',
+ temp_dir,
+ '--no-binary',
+ ':all:'
+ ]
+
+ if fetch_binary:
+ cmd_args.pop() # remove the no binary flag
Review comment:
I would make it part of the condition:
```
if fetch_binary:
# add binary flags:
else:
# add no-binary flags
# run the command.
```
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -815,9 +876,10 @@ def _download_pypi_sdk_package(
language_version_tag='27',
language_implementation_tag='cp',
abi_tag='cp27mu',
- platform_tag='manylinux1_x86_64'):
+ platform_tag='manylinux1_x86_64',
+ package_name=None):
"""Downloads SDK package from PyPI and returns path to local path."""
- package_name = Stager.get_sdk_package_name()
+ package_name = package_name or Stager.get_sdk_package_name()
Review comment:
Leftover?
Or did you mean to combine `_download_pypi_package` and
`_download_pypi_sdk_package` into the same method (as in, you would pass
`package_name = Stager.get_sdk_package_name()` into `_download_pypi_package`
to download the SDK)? That would make sense, as it's mostly the same code. We
can remove the warning specific to the apache beam SDK package.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]