tvalentyn commented on a change in pull request #16448:
URL: https://github.com/apache/beam/pull/16448#discussion_r785238296
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +225,16 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
+ if not fetch_binary: # display warning.
Review comment:
the inline comment is not necessary, the comment is self-explanatory in
this case. Btw,
https://stackoverflow.blog/2021/12/23/best-practices-for-writing-code-comments/
is a good read. Rule 1 applies here.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -656,25 +666,41 @@ def remove_dependency_from_requirements(
for i in range(len(lines)):
if not lines[i].startswith(dependency_to_remove):
tf.write(lines[i])
+ requirements_to_install.append(lines[i].strip())
- return tmp_requirements_filename
+ return tmp_requirements_filename, requirements_to_install
@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
- def _populate_requirements_cache(requirements_file, cache_dir):
- # The 'pip download' command will not download again if it finds the
- # tarball with the proper version already present.
- # It will get the packages downloaded in the order they are presented in
- # the requirements file and will download package dependencies.
-
- # The apache-beam dependency is excluded from requirements cache
population
- # because we stage the SDK separately.
+ def _populate_requirements_cache(requirements_file, # type: str
+ cache_dir, # type: str
+ fetch_binary=False):
Review comment:
To reduce code complexity, I think we can omit `fetch_binary` param from
this flag. Currently, the only case when `fetch_binary=False` branch is used is
custom container users who pass the requirements file. I think it's ok to
change the behavior for these users as well, so that we download sources
one-by-one from the requirements.txt file in this case, as opposed to download
sources from the entire requirements file
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -197,6 +198,9 @@ def create_job_resources(options, # type: PipelineOptions
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]
setup_options = options.view_as(SetupOptions)
+ # True when sdk_container_image is apache beam image
+ fetch_binary = (
Review comment:
code might be a little more self-explanatory with:
```suggestion
has_default_container_image = (
```
then, we can use below: `fetch_binary=has_default_container_image` when
passing into a function.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -162,7 +163,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
- populate_requirements_cache=None, # type:
Optional[str]
+ populate_requirements_cache=None, # type:
Optional[Callable]
Review comment:
Thanks. Let's include argument types. I think it's:
```suggestion
populate_requirements_cache=None, # type:
Optional[Callable[[str, str], None]]
```
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -221,10 +225,16 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
+ if not fetch_binary: # display warning.
+ _LOGGER.warning(
+ 'Avoid using requirements.txt when using a '
+ 'custom container image.')
Review comment:
Wording suggestion for the message:
When using a custom container image, prefer installing additional PyPI
dependencies directly into the image, instead of specifying them via runtime
options, such as --requirements_file.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -824,32 +890,17 @@ def _download_pypi_sdk_package(
raise RuntimeError(
Review comment:
Let's remove the arguments that can be inferred (language_version, impl
tag, abi tag, platform) from the function signature
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -394,7 +403,7 @@ def create_and_stage_job_resources(
build_setup_args=None, # type: Optional[List[str]]
temp_dir=None, # type: Optional[str]
pypi_requirements=None, # type: Optional[List[str]]
- populate_requirements_cache=None, # type: Optional[str]
+ populate_requirements_cache=None, # type: Optional[Callable]
Review comment:
Thanks. let's add type of arguments & return result for this Callable.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -692,6 +718,46 @@ def _populate_requirements_cache(requirements_file,
cache_dir):
]
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
+ else: # try to download wheels
+ language_implementation_tag = 'cp'
+ language_version_tag = '%d%d' % (
+ sys.version_info[0], sys.version_info[1]) # Python version
+ abi_suffix = 'm' if sys.version_info < (
+ 3, 8) else '' # ABI suffix to use for the whl
+ abi_tag = 'cp%d%d%s' % (
+ sys.version_info[0], sys.version_info[1], abi_suffix
+ ) # ABI tag to use
+ # install each package individually
Review comment:
Add explanation why installing each package individually is necessary in
this case.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -884,3 +933,51 @@ def _download_pypi_sdk_package(
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' %
(expected_files))
+
+ @staticmethod
+ def _download_pypi_package(
+ temp_dir,
+ fetch_binary=False,
+ language_version_tag='27',
+ language_implementation_tag='cp',
+ abi_tag='cp27mu',
+ platform_tag='manylinux2014_x86_64',
Review comment:
let's create a helper function that returns target platform, and check
pip version in that helper. For pip >= 19.3 let's use manylinux2014_x86_64
wheel, otherwise use manylinux2010_x86_64 wheel and add a TODO: when
https://github.com/pypa/pip/issues/10760 is addressed, download the wheel based
on glibc version in Beam's Python SDK base image.
##########
File path: sdks/python/apache_beam/runners/portability/stager.py
##########
@@ -884,3 +933,51 @@ def _download_pypi_sdk_package(
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' %
(expected_files))
+
+ @staticmethod
+ def _download_pypi_package(
+ temp_dir,
+ fetch_binary=False,
+ language_version_tag='27',
+ language_implementation_tag='cp',
+ abi_tag='cp27mu',
+ platform_tag='manylinux2014_x86_64',
+ package_name=None,
Review comment:
(optional) It may make sense to move this param earlier in the argument
list
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]