This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch revert-27436-unstage_sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit ace89c994b76308f6201a5e2a8842988a76ad6d8 Author: Danny McCormick <[email protected]> AuthorDate: Tue Aug 22 10:51:01 2023 -0400 Revert "[Python]Don't stage beam SDK in Stager (#27436)" This reverts commit 408d766d55fb56eb2af2b65d72a27618cda84883. --- CHANGES.md | 1 - .../python/apache_beam/options/pipeline_options.py | 10 +- .../apache_beam/runners/portability/stager.py | 192 +++++++++++++++++---- .../apache_beam/runners/portability/stager_test.py | 92 +++++++++- sdks/python/container/boot.go | 5 - sdks/python/container/piputil.go | 13 -- 6 files changed, 255 insertions(+), 58 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 29b8fb2e0ab..cff41bc5971 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -126,7 +126,6 @@ ## Breaking Changes * Python SDK: Legacy runner support removed from Dataflow, all pipelines must use runner v2. -* [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the `--staging_location` at pipeline submission. Custom container images that are not based on Beam's default image must include Apache Beam installation.([#26996](https://github.com/apache/beam/issues/26996)) ## Deprecations diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 6d9c5ecd37b..5d05259ea71 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1296,13 +1296,11 @@ class SetupOptions(PipelineOptions): '--sdk_location', default='default', help=( - 'Path to a custom Beam SDK package to install and use on the' - 'runner. It can be a URL, a GCS path, or a local path to an ' + 'Override the default location from where the Beam SDK is ' + 'downloaded. It can be a URL, a GCS path, or a local path to an ' 'SDK tarball. Workflow submissions will download or copy an SDK ' - 'tarball from here. If set to "default", ' - 'runners will use the SDK provided in the default environment.' - 'Use this flag when running pipelines with an unreleased or ' - 'manually patched version of Beam SDK.')) + 'tarball from here. If set to the string "default", a standard ' + 'SDK location is used. If empty, no SDK is copied.')) parser.add_argument( '--extra_package', '--extra_packages', diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 1f093b1d7bc..ee920a32516 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -115,6 +115,12 @@ class Stager(object): """Commits manifest.""" raise NotImplementedError + @staticmethod + def get_sdk_package_name(): + """For internal use only; no backwards-compatibility guarantees. + Returns the PyPI package name to be staged.""" + return names.BEAM_PACKAGE_NAME + @staticmethod def _create_file_stage_to_artifact(local_path, staged_name): return beam_runner_api_pb2.ArtifactInformation( @@ -290,28 +296,30 @@ class Stager(object): setup_options.extra_packages, temp_dir=temp_dir)) if hasattr(setup_options, 'sdk_location'): - sdk_location = setup_options.sdk_location - if Stager._is_remote_path(sdk_location): - try: - resources.extend( - Stager._create_beam_sdk( - sdk_remote_location=setup_options.sdk_location, - temp_dir=temp_dir, - )) - except: - raise RuntimeError( - 'The --sdk_location option was used with an unsupported ' - 'type of location: %s' % sdk_location) - - elif sdk_location == 'default': - # Use default location for a runner. - pass - elif sdk_location == 'container': - # Used in the past to indicate that SDK should be used from container - # image instead of being staged. - # Equivalent to 'default' now, leaving for backwards compatibility. + + if (setup_options.sdk_location == 'default') or Stager._is_remote_path( + setup_options.sdk_location): + # If --sdk_location is not specified then the appropriate package + # will be obtained from PyPI (https://pypi.python.org) based on the + # version of the currently running SDK. If the option is + # present then no version matching is made and the exact URL or path + # is expected. + # + # Unit tests running in the 'python setup.py test' context will + # not have the sdk_location attribute present and therefore we + # will not stage SDK. + sdk_remote_location = 'pypi' if ( + setup_options.sdk_location == 'default' + ) else setup_options.sdk_location + resources.extend( + Stager._create_beam_sdk(sdk_remote_location, temp_dir)) + elif setup_options.sdk_location == 'container': + # Use the SDK that's built into the container, rather than re-staging + # it. pass else: + # This branch is also used by internal tests running with the SDK + # built at head. if os.path.isdir(setup_options.sdk_location): sdk_path = os.path.join( setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME) @@ -337,6 +345,7 @@ class Stager(object): raise RuntimeError( 'The file "%s" cannot be found. Its location was specified ' 'by the --sdk_location command-line option.' % sdk_path) + # The following artifacts are not processed by python sdk container boot # sequence in a setup mode and hence should not be skipped even if a # prebuilt sdk container image is used. @@ -815,7 +824,8 @@ class Stager(object): Args: sdk_remote_location: A URL from which the file can be downloaded or a - remote file location. The SDK file can be a tarball or a wheel. + remote file location. The SDK file can be a tarball or a wheel. Set + to 'pypi' to download and stage a wheel and source SDK from PyPi. temp_dir: path to temporary location where the file should be downloaded. @@ -826,14 +836,136 @@ class Stager(object): Raises: RuntimeError: if staging was not successful. """ + if sdk_remote_location == 'pypi': + sdk_local_file = Stager._download_pypi_sdk_package(temp_dir) + sdk_sources_staged_name = Stager.\ + _desired_sdk_filename_in_staging_location(sdk_local_file) + _LOGGER.info('Staging SDK sources from PyPI: %s', sdk_sources_staged_name) + staged_sdk_files = [ + Stager._create_file_stage_to_artifact( + sdk_local_file, sdk_sources_staged_name) + ] + try: + abi_suffix = 'm' if sys.version_info < (3, 8) else '' + # Stage binary distribution of the SDK, for now on a best-effort basis. + platform_tag = Stager._get_platform_for_default_sdk_container() + sdk_local_file = Stager._download_pypi_sdk_package( + temp_dir, + fetch_binary=True, + language_version_tag='%d%d' % + (sys.version_info[0], sys.version_info[1]), + abi_tag='cp%d%d%s' % + (sys.version_info[0], sys.version_info[1], abi_suffix), + platform_tag=platform_tag) + sdk_binary_staged_name = Stager.\ + _desired_sdk_filename_in_staging_location(sdk_local_file) + _LOGGER.info( + 'Staging binary distribution of the SDK from PyPI: %s', + sdk_binary_staged_name) + staged_sdk_files.append( + Stager._create_file_stage_to_artifact( + sdk_local_file, sdk_binary_staged_name)) + except RuntimeError as e: + _LOGGER.warning( + 'Failed to download requested binary distribution ' + 'of the SDK: %s', + repr(e)) - sdk_remote_parsed = urlparse(sdk_remote_location) - sdk_remote_filename = os.path.basename(sdk_remote_parsed.path) - local_download_file = os.path.join(temp_dir, sdk_remote_filename) - Stager._download_file(sdk_remote_location, local_download_file) - staged_name = Stager._desired_sdk_filename_in_staging_location( - local_download_file) - _LOGGER.info('Staging Beam SDK from %s', sdk_remote_location) - return [ - Stager._create_file_stage_to_artifact(local_download_file, staged_name) + return staged_sdk_files + elif Stager._is_remote_path(sdk_remote_location): + sdk_remote_parsed = urlparse(sdk_remote_location) + sdk_remote_filename = os.path.basename(sdk_remote_parsed.path) + local_download_file = os.path.join(temp_dir, sdk_remote_filename) + Stager._download_file(sdk_remote_location, local_download_file) + staged_name = Stager._desired_sdk_filename_in_staging_location( + local_download_file) + _LOGGER.info('Staging Beam SDK from %s', sdk_remote_location) + return [ + Stager._create_file_stage_to_artifact( + local_download_file, staged_name) + ] + else: + raise RuntimeError( + 'The --sdk_location option was used with an unsupported ' + 'type of location: %s' % sdk_remote_location) + + @staticmethod + def _download_pypi_sdk_package( + temp_dir, + fetch_binary=False, + language_version_tag='39', + language_implementation_tag='cp', + abi_tag='cp39', + platform_tag='manylinux2014_x86_64'): + """Downloads SDK package from PyPI and returns path to local path.""" + package_name = Stager.get_sdk_package_name() + try: + version = pkg_resources.get_distribution(package_name).version + except pkg_resources.DistributionNotFound: + raise RuntimeError( + 'Please set --sdk_location command-line option ' + 'or install a valid {} distribution.'.format(package_name)) + cmd_args = [ + Stager._get_python_executable(), + '-m', + 'pip', + 'download', + '--dest', + temp_dir, + '%s==%s' % (package_name, version), + '--no-deps' ] + + if fetch_binary: + _LOGGER.info('Downloading binary distribution of the SDK from PyPi') + # Get a wheel distribution for the SDK from PyPI. + cmd_args.extend([ + '--only-binary', + ':all:', + '--python-version', + language_version_tag, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform_tag + ]) + # Example wheel: with manylinux14 tag. + # apache_beam-2.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl # pylint: disable=line-too-long + if platform_tag == 'manylinux2014_x86_64': + platform_tag = 'manylinux_2_17_x86_64.' + platform_tag + expected_files = [ + os.path.join( + temp_dir, + '%s-%s-%s%s-%s-%s.whl' % ( + package_name.replace('-', '_'), + version, + language_implementation_tag, + language_version_tag, + abi_tag, + platform_tag)), + ] + + else: + _LOGGER.info('Downloading source distribution of the SDK from PyPi') + cmd_args.extend(['--no-binary', ':all:']) + expected_files = [ + os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)), + os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version)) + ] + + _LOGGER.info('Executing command: %s', cmd_args) + try: + processes.check_output(cmd_args) + except processes.CalledProcessError as e: + raise RuntimeError(repr(e)) + + for sdk_file in expected_files: + if os.path.exists(sdk_file): + return sdk_file + + raise RuntimeError( + 'Failed to download a distribution for the running SDK. ' + 'Expected either one of %s to be found in the download folder.' % + (expected_files)) diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 839f7e57773..bf876925ff7 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -96,6 +96,64 @@ class StagerTest(unittest.TestCase): self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing') self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing') + def build_fake_pip_download_command_handler(self, has_wheels): + """A stub for apache_beam.utils.processes.check_output that imitates pip. + + Args: + has_wheels: Whether pip fake should have a whl distribution of packages. + """ + def pip_fake(args): + """Fakes fetching a package from pip by creating a temporary file. + + Args: + args: a complete list of command line arguments to invoke pip. + The fake is sensitive to the order of the arguments. + Supported commands: + + 1) Download SDK sources file: + python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ + --no-deps --no-binary :all: + + 2) Download SDK binary wheel file: + python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ + --no-deps --no-binary :all: --python-version 27 \ + --implementation cp --abi cp27mu --platform manylinux1_x86_64 + """ + package_file = None + if len(args) >= 8: + # package_name==x.y.z + if '==' in args[6]: + distribution_name = args[6][0:args[6].find('==')] + distribution_version = args[6][args[6].find('==') + 2:] + + if args[8] == '--no-binary': + package_file = '%s-%s.zip' % ( + distribution_name, distribution_version) + elif args[8] == '--only-binary' and len(args) >= 18: + if not has_wheels: + # Imitate the case when desired wheel distribution is not in PyPI. + raise RuntimeError('No matching distribution.') + + # Per PEP-0427 in wheel filenames non-alphanumeric characters + # in distribution name are replaced with underscore. + distribution_name = distribution_name.replace('-', '_') + if args[17] == 'manylinux2014_x86_64': + args[17] = 'manylinux_2_17_x86_64.' + args[17] + package_file = '%s-%s-%s%s-%s-%s.whl' % ( + distribution_name, + distribution_version, + args[13], # implementation + args[11], # python version + args[15], # abi tag + args[17] # platform + ) + + assert package_file, 'Pip fake does not support the command: ' + str(args) + self.create_temp_file( + FileSystems.join(args[5], package_file), 'Package content.') + + return pip_fake + @mock.patch('apache_beam.runners.portability.stager.open') @mock.patch('apache_beam.runners.portability.stager.get_new_http') def test_download_file_https(self, mock_new_http, mock_open): @@ -375,10 +433,38 @@ class StagerTest(unittest.TestCase): self.update_options(options) options.view_as(SetupOptions).sdk_location = 'default' - _, staged_resources = self.stager.create_and_stage_job_resources( - options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) + with mock.patch( + 'apache_beam.utils.processes.check_output', + self.build_fake_pip_download_command_handler(has_wheels=False)): + _, staged_resources = self.stager.create_and_stage_job_resources( + options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) + + self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], staged_resources) + + with open(os.path.join(staging_dir, + names.STAGED_SDK_SOURCES_FILENAME)) as f: + self.assertEqual(f.read(), 'Package content.') + + def test_sdk_location_default_with_wheels(self): + staging_dir = self.make_temp_dir() + + options = PipelineOptions() + self.update_options(options) + options.view_as(SetupOptions).sdk_location = 'default' - self.assertEqual([], staged_resources) + with mock.patch( + 'apache_beam.utils.processes.check_output', + self.build_fake_pip_download_command_handler(has_wheels=True)): + _, staged_resources = self.stager.create_and_stage_job_resources( + options, temp_dir=self.make_temp_dir(), staging_location=staging_dir) + + self.assertEqual(len(staged_resources), 2) + self.assertEqual(staged_resources[0], names.STAGED_SDK_SOURCES_FILENAME) + # Exact name depends on the version of the SDK. + self.assertTrue(staged_resources[1].endswith('whl')) + for name in staged_resources: + with open(os.path.join(staging_dir, name)) as f: + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_local_directory(self): staging_dir = self.make_temp_dir() diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index c67239f0564..dd74acabfd8 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -377,11 +377,6 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err) } - pkgName := "apache-beam" - isSdkInstalled := isPackageInstalled(pkgName) - if !isSdkInstalled { - return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/.") - } // Install the Dataflow Python SDK and worker packages. // We install the extra requirements in case of using the beam sdk. These are ignored by pip // if the user is using an SDK that does not provide these. diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 720bf372c53..b4885e6356d 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -22,7 +22,6 @@ import ( "fmt" "log" "os" - "os/exec" "path/filepath" "strings" @@ -51,18 +50,6 @@ func pipInstallRequirements(files []string, dir, name string) error { return nil } -// isPackageInstalled checks if the given package is installed in the -// environment. -func isPackageInstalled(pkgName string) bool { - cmd := exec.Command("python", "-m", "pip", "show", pkgName) - if err := cmd.Run(); err != nil { - if _, ok := err.(*exec.ExitError); ok { - return false - } - } - return true -} - // pipInstallPackage installs the given package, if present. func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error { for _, file := range files {
