This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch revert-27436-unstage_sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ace89c994b76308f6201a5e2a8842988a76ad6d8
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Aug 22 10:51:01 2023 -0400

    Revert "[Python]Don't stage beam SDK in Stager (#27436)"
    
    This reverts commit 408d766d55fb56eb2af2b65d72a27618cda84883.
---
 CHANGES.md                                         |   1 -
 .../python/apache_beam/options/pipeline_options.py |  10 +-
 .../apache_beam/runners/portability/stager.py      | 192 +++++++++++++++++----
 .../apache_beam/runners/portability/stager_test.py |  92 +++++++++-
 sdks/python/container/boot.go                      |   5 -
 sdks/python/container/piputil.go                   |  13 --
 6 files changed, 255 insertions(+), 58 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 29b8fb2e0ab..cff41bc5971 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -126,7 +126,6 @@
 ## Breaking Changes
 
 * Python SDK: Legacy runner support removed from Dataflow, all pipelines must 
use runner v2.
-* [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the 
`--staging_location` at pipeline submission. Custom container images that are 
not based on Beam's default image must include Apache Beam 
installation.([#26996](https://github.com/apache/beam/issues/26996))
 
 ## Deprecations
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 6d9c5ecd37b..5d05259ea71 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1296,13 +1296,11 @@ class SetupOptions(PipelineOptions):
         '--sdk_location',
         default='default',
         help=(
-            'Path to a custom Beam SDK package to install and use on the'
-            'runner. It can be a URL, a GCS path, or a local path to an '
+            'Override the default location from where the Beam SDK is '
+            'downloaded. It can be a URL, a GCS path, or a local path to an '
             'SDK tarball. Workflow submissions will download or copy an SDK '
-            'tarball from here. If set to "default", '
-            'runners will use the SDK provided in the default environment.'
-            'Use this flag when running pipelines with an unreleased or '
-            'manually patched version of Beam SDK.'))
+            'tarball from here. If set to the string "default", a standard '
+            'SDK location is used. If empty, no SDK is copied.'))
     parser.add_argument(
         '--extra_package',
         '--extra_packages',
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index 1f093b1d7bc..ee920a32516 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -115,6 +115,12 @@ class Stager(object):
     """Commits manifest."""
     raise NotImplementedError
 
+  @staticmethod
+  def get_sdk_package_name():
+    """For internal use only; no backwards-compatibility guarantees.
+        Returns the PyPI package name to be staged."""
+    return names.BEAM_PACKAGE_NAME
+
   @staticmethod
   def _create_file_stage_to_artifact(local_path, staged_name):
     return beam_runner_api_pb2.ArtifactInformation(
@@ -290,28 +296,30 @@ class Stager(object):
                 setup_options.extra_packages, temp_dir=temp_dir))
 
       if hasattr(setup_options, 'sdk_location'):
-        sdk_location = setup_options.sdk_location
-        if Stager._is_remote_path(sdk_location):
-          try:
-            resources.extend(
-                Stager._create_beam_sdk(
-                    sdk_remote_location=setup_options.sdk_location,
-                    temp_dir=temp_dir,
-                ))
-          except:
-            raise RuntimeError(
-                'The --sdk_location option was used with an unsupported '
-                'type of location: %s' % sdk_location)
-
-        elif sdk_location == 'default':
-          # Use default location for a runner.
-          pass
-        elif sdk_location == 'container':
-          # Used in the past to indicate that SDK should be used from container
-          # image instead of being staged.
-          # Equivalent to 'default' now, leaving for backwards compatibility.
+
+        if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
+            setup_options.sdk_location):
+          # If --sdk_location is not specified then the appropriate package
+          # will be obtained from PyPI (https://pypi.python.org) based on the
+          # version of the currently running SDK. If the option is
+          # present then no version matching is made and the exact URL or path
+          # is expected.
+          #
+          # Unit tests running in the 'python setup.py test' context will
+          # not have the sdk_location attribute present and therefore we
+          # will not stage SDK.
+          sdk_remote_location = 'pypi' if (
+              setup_options.sdk_location == 'default'
+          ) else setup_options.sdk_location
+          resources.extend(
+              Stager._create_beam_sdk(sdk_remote_location, temp_dir))
+        elif setup_options.sdk_location == 'container':
+          # Use the SDK that's built into the container, rather than re-staging
+          # it.
           pass
         else:
+          # This branch is also used by internal tests running with the SDK
+          # built at head.
           if os.path.isdir(setup_options.sdk_location):
             sdk_path = os.path.join(
                 setup_options.sdk_location, names.STAGED_SDK_SOURCES_FILENAME)
@@ -337,6 +345,7 @@ class Stager(object):
               raise RuntimeError(
                   'The file "%s" cannot be found. Its location was specified '
                   'by the --sdk_location command-line option.' % sdk_path)
+
     # The following artifacts are not processed by python sdk container boot
     # sequence in a setup mode and hence should not be skipped even if a
     # prebuilt sdk container image is used.
@@ -815,7 +824,8 @@ class Stager(object):
 
       Args:
         sdk_remote_location: A URL from which the file can be downloaded or a
-          remote file location. The SDK file can be a tarball or a wheel.
+          remote file location. The SDK file can be a tarball or a wheel. Set
+          to 'pypi' to download and stage a wheel and source SDK from PyPi.
         temp_dir: path to temporary location where the file should be
           downloaded.
 
@@ -826,14 +836,136 @@ class Stager(object):
       Raises:
         RuntimeError: if staging was not successful.
       """
+    if sdk_remote_location == 'pypi':
+      sdk_local_file = Stager._download_pypi_sdk_package(temp_dir)
+      sdk_sources_staged_name = Stager.\
+          _desired_sdk_filename_in_staging_location(sdk_local_file)
+      _LOGGER.info('Staging SDK sources from PyPI: %s', 
sdk_sources_staged_name)
+      staged_sdk_files = [
+          Stager._create_file_stage_to_artifact(
+              sdk_local_file, sdk_sources_staged_name)
+      ]
+      try:
+        abi_suffix = 'm' if sys.version_info < (3, 8) else ''
+        # Stage binary distribution of the SDK, for now on a best-effort basis.
+        platform_tag = Stager._get_platform_for_default_sdk_container()
+        sdk_local_file = Stager._download_pypi_sdk_package(
+            temp_dir,
+            fetch_binary=True,
+            language_version_tag='%d%d' %
+            (sys.version_info[0], sys.version_info[1]),
+            abi_tag='cp%d%d%s' %
+            (sys.version_info[0], sys.version_info[1], abi_suffix),
+            platform_tag=platform_tag)
+        sdk_binary_staged_name = Stager.\
+            _desired_sdk_filename_in_staging_location(sdk_local_file)
+        _LOGGER.info(
+            'Staging binary distribution of the SDK from PyPI: %s',
+            sdk_binary_staged_name)
+        staged_sdk_files.append(
+            Stager._create_file_stage_to_artifact(
+                sdk_local_file, sdk_binary_staged_name))
+      except RuntimeError as e:
+        _LOGGER.warning(
+            'Failed to download requested binary distribution '
+            'of the SDK: %s',
+            repr(e))
 
-    sdk_remote_parsed = urlparse(sdk_remote_location)
-    sdk_remote_filename = os.path.basename(sdk_remote_parsed.path)
-    local_download_file = os.path.join(temp_dir, sdk_remote_filename)
-    Stager._download_file(sdk_remote_location, local_download_file)
-    staged_name = Stager._desired_sdk_filename_in_staging_location(
-        local_download_file)
-    _LOGGER.info('Staging Beam SDK from %s', sdk_remote_location)
-    return [
-        Stager._create_file_stage_to_artifact(local_download_file, staged_name)
+      return staged_sdk_files
+    elif Stager._is_remote_path(sdk_remote_location):
+      sdk_remote_parsed = urlparse(sdk_remote_location)
+      sdk_remote_filename = os.path.basename(sdk_remote_parsed.path)
+      local_download_file = os.path.join(temp_dir, sdk_remote_filename)
+      Stager._download_file(sdk_remote_location, local_download_file)
+      staged_name = Stager._desired_sdk_filename_in_staging_location(
+          local_download_file)
+      _LOGGER.info('Staging Beam SDK from %s', sdk_remote_location)
+      return [
+          Stager._create_file_stage_to_artifact(
+              local_download_file, staged_name)
+      ]
+    else:
+      raise RuntimeError(
+          'The --sdk_location option was used with an unsupported '
+          'type of location: %s' % sdk_remote_location)
+
+  @staticmethod
+  def _download_pypi_sdk_package(
+      temp_dir,
+      fetch_binary=False,
+      language_version_tag='39',
+      language_implementation_tag='cp',
+      abi_tag='cp39',
+      platform_tag='manylinux2014_x86_64'):
+    """Downloads SDK package from PyPI and returns path to local path."""
+    package_name = Stager.get_sdk_package_name()
+    try:
+      version = pkg_resources.get_distribution(package_name).version
+    except pkg_resources.DistributionNotFound:
+      raise RuntimeError(
+          'Please set --sdk_location command-line option '
+          'or install a valid {} distribution.'.format(package_name))
+    cmd_args = [
+        Stager._get_python_executable(),
+        '-m',
+        'pip',
+        'download',
+        '--dest',
+        temp_dir,
+        '%s==%s' % (package_name, version),
+        '--no-deps'
     ]
+
+    if fetch_binary:
+      _LOGGER.info('Downloading binary distribution of the SDK from PyPi')
+      # Get a wheel distribution for the SDK from PyPI.
+      cmd_args.extend([
+          '--only-binary',
+          ':all:',
+          '--python-version',
+          language_version_tag,
+          '--implementation',
+          language_implementation_tag,
+          '--abi',
+          abi_tag,
+          '--platform',
+          platform_tag
+      ])
+      # Example wheel: with manylinux14 tag.
+      # 
apache_beam-2.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl # 
pylint: disable=line-too-long
+      if platform_tag == 'manylinux2014_x86_64':
+        platform_tag = 'manylinux_2_17_x86_64.' + platform_tag
+      expected_files = [
+          os.path.join(
+              temp_dir,
+              '%s-%s-%s%s-%s-%s.whl' % (
+                  package_name.replace('-', '_'),
+                  version,
+                  language_implementation_tag,
+                  language_version_tag,
+                  abi_tag,
+                  platform_tag)),
+      ]
+
+    else:
+      _LOGGER.info('Downloading source distribution of the SDK from PyPi')
+      cmd_args.extend(['--no-binary', ':all:'])
+      expected_files = [
+          os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
+          os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
+      ]
+
+    _LOGGER.info('Executing command: %s', cmd_args)
+    try:
+      processes.check_output(cmd_args)
+    except processes.CalledProcessError as e:
+      raise RuntimeError(repr(e))
+
+    for sdk_file in expected_files:
+      if os.path.exists(sdk_file):
+        return sdk_file
+
+    raise RuntimeError(
+        'Failed to download a distribution for the running SDK. '
+        'Expected either one of %s to be found in the download folder.' %
+        (expected_files))
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py 
b/sdks/python/apache_beam/runners/portability/stager_test.py
index 839f7e57773..bf876925ff7 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -96,6 +96,64 @@ class StagerTest(unittest.TestCase):
     self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
     self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
 
+  def build_fake_pip_download_command_handler(self, has_wheels):
+    """A stub for apache_beam.utils.processes.check_output that imitates pip.
+
+      Args:
+        has_wheels: Whether pip fake should have a whl distribution of 
packages.
+      """
+    def pip_fake(args):
+      """Fakes fetching a package from pip by creating a temporary file.
+
+          Args:
+            args: a complete list of command line arguments to invoke pip.
+              The fake is sensitive to the order of the arguments.
+              Supported commands:
+
+              1) Download SDK sources file:
+              python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
+                  --no-deps --no-binary :all:
+
+              2) Download SDK binary wheel file:
+              python pip -m download --dest /tmp/dir apache-beam==2.0.0 \
+                  --no-deps --no-binary :all: --python-version 27 \
+                  --implementation cp --abi cp27mu --platform manylinux1_x86_64
+          """
+      package_file = None
+      if len(args) >= 8:
+        # package_name==x.y.z
+        if '==' in args[6]:
+          distribution_name = args[6][0:args[6].find('==')]
+          distribution_version = args[6][args[6].find('==') + 2:]
+
+          if args[8] == '--no-binary':
+            package_file = '%s-%s.zip' % (
+                distribution_name, distribution_version)
+          elif args[8] == '--only-binary' and len(args) >= 18:
+            if not has_wheels:
+              # Imitate the case when desired wheel distribution is not in 
PyPI.
+              raise RuntimeError('No matching distribution.')
+
+            # Per PEP-0427 in wheel filenames non-alphanumeric characters
+            # in distribution name are replaced with underscore.
+            distribution_name = distribution_name.replace('-', '_')
+            if args[17] == 'manylinux2014_x86_64':
+              args[17] = 'manylinux_2_17_x86_64.' + args[17]
+            package_file = '%s-%s-%s%s-%s-%s.whl' % (
+                distribution_name,
+                distribution_version,
+                args[13],  # implementation
+                args[11],  # python version
+                args[15],  # abi tag
+                args[17]  # platform
+            )
+
+      assert package_file, 'Pip fake does not support the command: ' + 
str(args)
+      self.create_temp_file(
+          FileSystems.join(args[5], package_file), 'Package content.')
+
+    return pip_fake
+
   @mock.patch('apache_beam.runners.portability.stager.open')
   @mock.patch('apache_beam.runners.portability.stager.get_new_http')
   def test_download_file_https(self, mock_new_http, mock_open):
@@ -375,10 +433,38 @@ class StagerTest(unittest.TestCase):
     self.update_options(options)
     options.view_as(SetupOptions).sdk_location = 'default'
 
-    _, staged_resources = self.stager.create_and_stage_job_resources(
-        options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
+    with mock.patch(
+        'apache_beam.utils.processes.check_output',
+        self.build_fake_pip_download_command_handler(has_wheels=False)):
+      _, staged_resources = self.stager.create_and_stage_job_resources(
+          options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
+
+    self.assertEqual([names.STAGED_SDK_SOURCES_FILENAME], staged_resources)
+
+    with open(os.path.join(staging_dir,
+                           names.STAGED_SDK_SOURCES_FILENAME)) as f:
+      self.assertEqual(f.read(), 'Package content.')
+
+  def test_sdk_location_default_with_wheels(self):
+    staging_dir = self.make_temp_dir()
+
+    options = PipelineOptions()
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = 'default'
 
-    self.assertEqual([], staged_resources)
+    with mock.patch(
+        'apache_beam.utils.processes.check_output',
+        self.build_fake_pip_download_command_handler(has_wheels=True)):
+      _, staged_resources = self.stager.create_and_stage_job_resources(
+          options, temp_dir=self.make_temp_dir(), staging_location=staging_dir)
+
+      self.assertEqual(len(staged_resources), 2)
+      self.assertEqual(staged_resources[0], names.STAGED_SDK_SOURCES_FILENAME)
+      # Exact name depends on the version of the SDK.
+      self.assertTrue(staged_resources[1].endswith('whl'))
+      for name in staged_resources:
+        with open(os.path.join(staging_dir, name)) as f:
+          self.assertEqual(f.read(), 'Package content.')
 
   def test_sdk_location_local_directory(self):
     staging_dir = self.make_temp_dir()
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index c67239f0564..dd74acabfd8 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -377,11 +377,6 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
                log.Printf("Failed to setup acceptable wheel specs, leave it as 
empty: %v", err)
        }
 
-       pkgName := "apache-beam"
-       isSdkInstalled := isPackageInstalled(pkgName)
-       if !isSdkInstalled {
-               return fmt.Errorf("Apache Beam is not installed in the runtime 
environment. If you use a custom container image, you must install apache-beam 
package in the custom image using same version of Beam as in the pipeline 
submission environment. For more information, see: the 
https://beam.apache.org/documentation/runtime/environments/.";)
-       }
        // Install the Dataflow Python SDK and worker packages.
        // We install the extra requirements in case of using the beam sdk. 
These are ignored by pip
        // if the user is using an SDK that does not provide these.
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 720bf372c53..b4885e6356d 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "log"
        "os"
-       "os/exec"
        "path/filepath"
        "strings"
 
@@ -51,18 +50,6 @@ func pipInstallRequirements(files []string, dir, name 
string) error {
        return nil
 }
 
-// isPackageInstalled checks if the given package is installed in the
-// environment.
-func isPackageInstalled(pkgName string) bool {
-       cmd := exec.Command("python", "-m", "pip", "show", pkgName)
-       if err := cmd.Run(); err != nil {
-               if _, ok := err.(*exec.ExitError); ok {
-                       return false
-               }
-       }
-       return true
-}
-
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(files []string, dir, name string, force, optional bool, 
extras []string) error {
        for _, file := range files {

Reply via email to