tvalentyn commented on code in PR #27436:
URL: https://github.com/apache/beam/pull/27436#discussion_r1275186021


##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
         '--sdk_location',
         default='default',
         help=(
-            'Override the default location from where the Beam SDK is '
-            'downloaded. It can be a URL, a GCS path, or a local path to an '
+            'Override the default location of the beam SDK. '

Review Comment:
   ```suggestion
               'Path to a custom Beam SDK package to install and use on the 
runner. '
   ```



##########
CHANGES.md:
##########
@@ -72,6 +72,7 @@
 ## Breaking Changes
 
 * Legacy runner support removed from Dataflow, all pipelines must use runner 
v2.
+* [Python]Beam SDK will not be staged from PyPI in the --staging_location for 
DataflowRunner. If the custom container doesn't include Apache Beam, it will no 
longer be installed using the staged Beam 
SDK.([#26996](https://github.com/apache/beam/issues/26996))

Review Comment:
   ```suggestion
   * [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the 
`--staging_location` at pipeline submission. Custom container images that are 
not based on Beam's default image must include Apache Beam 
installation.([#26996](https://github.com/apache/beam/issues/26996))
   ```



##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
         '--sdk_location',
         default='default',
         help=(
-            'Override the default location from where the Beam SDK is '
-            'downloaded. It can be a URL, a GCS path, or a local path to an '
+            'Override the default location of the beam SDK. '
+            'It can be a URL, a GCS path, or a local path to an '
             'SDK tarball. Workflow submissions will download or copy an SDK '

Review Comment:
   Use this flag when running pipelines with an unreleased or manually patched 
version of Beam SDK.



##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
         '--sdk_location',
         default='default',
         help=(
-            'Override the default location from where the Beam SDK is '
-            'downloaded. It can be a URL, a GCS path, or a local path to an '
+            'Override the default location of the beam SDK. '
+            'It can be a URL, a GCS path, or a local path to an '
             'SDK tarball. Workflow submissions will download or copy an SDK '
-            'tarball from here. If set to the string "default", a standard '
-            'SDK location is used. If empty, no SDK is copied.'))
+            'tarball from here. If set to the string "default", '
+            'the beam SDK in the default container will be used.'))

Review Comment:
    If set to the string "default", runners will use the SDK provided in the 
default environment.



##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
                log.Printf("Failed to setup acceptable wheel specs, leave it as 
empty: %v", err)
        }
 
+       pkgName := "apache-beam"
+       isSdkInstalled, err := isPackageInstalled(pkgName)
+       if err != nil {
+               return fmt.Errorf("failed to check if Apache Beam %s is 
installed: %v", pkgName, err)
+       }
+
+       if !isSdkInstalled {

Review Comment:
   this should actually be an error.



##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
                log.Printf("Failed to setup acceptable wheel specs, leave it as 
empty: %v", err)
        }
 
+       pkgName := "apache-beam"
+       isSdkInstalled, err := isPackageInstalled(pkgName)
+       if err != nil {
+               return fmt.Errorf("failed to check if Apache Beam %s is 
installed: %v", pkgName, err)
+       }
+
+       if !isSdkInstalled {
+               log.Printf("Apache Beam is not installed on the custom 
container. Please make sure Apache Beam is installed in the custom container. 
Look at the docummentation 
https://beam.apache.org/documentation/runtime/environments/ on custom 
containers for more details.")

Review Comment:
   wording suggestion: "Apache Beam is not installed in the runtime 
environment. If you use a custom container image, you must install apache-beam 
package in the custom image using same version of Beam as in the pipeline 
submission environment. For more information, see: the 
https://beam.apache.org/documentation/runtime/environments/ page."



##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
                log.Printf("Failed to setup acceptable wheel specs, leave it as 
empty: %v", err)
        }
 
+       pkgName := "apache-beam"
+       isSdkInstalled, err := isPackageInstalled(pkgName)
+       if err != nil {
+               return fmt.Errorf("failed to check if Apache Beam %s is 
installed: %v", pkgName, err)

Review Comment:
   this shouldn't be a hard error, we can attempt to continue.



##########
sdks/python/apache_beam/runners/portability/stager.py:
##########
@@ -296,30 +290,24 @@ def create_job_resources(options,  # type: PipelineOptions
                 setup_options.extra_packages, temp_dir=temp_dir))
 
       if hasattr(setup_options, 'sdk_location'):
-
-        if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
-            setup_options.sdk_location):
-          # If --sdk_location is not specified then the appropriate package
-          # will be obtained from PyPI (https://pypi.python.org) based on the
-          # version of the currently running SDK. If the option is
-          # present then no version matching is made and the exact URL or path
-          # is expected.
-          #
-          # Unit tests running in the 'python setup.py test' context will
-          # not have the sdk_location attribute present and therefore we
-          # will not stage SDK.
-          sdk_remote_location = 'pypi' if (
-              setup_options.sdk_location == 'default'
-          ) else setup_options.sdk_location
-          resources.extend(
-              Stager._create_beam_sdk(sdk_remote_location, temp_dir))
-        elif setup_options.sdk_location == 'container':
-          # Use the SDK that's built into the container, rather than re-staging
-          # it.
+        sdk_location = setup_options.sdk_location
+        # check if it is remote location
+        if Stager._is_remote_path(sdk_location):
+          try:
+            resources.extend(
+                Stager._create_beam_sdk(
+                    sdk_remote_location=setup_options.sdk_location,
+                    temp_dir=temp_dir,
+                ))
+          except:
+            raise RuntimeError(
+                'The --sdk_location option was used with an unsupported '
+                'type of location: %s' % sdk_location)
+
+        elif sdk_location == 'default' or sdk_location == 'container':

Review Comment:
   ```
     elif sdk_location == 'default':
       # Use default location for a runner.
       pass
     elif sdk_location == 'container':
       # Used in the past to indicate that SDK should be used from container 
image instead of being staged.
       # Equivalent to 'default' now, leaving for backwards compatibility 
reasons.
       pass
   ```



##########
sdks/python/container/piputil.go:
##########
@@ -52,6 +53,23 @@ func pipInstallRequirements(files []string, dir, name 
string) error {
        return nil
 }
 
+
+func isPackageInstalled(pkgName string) (bool, error) {

Review Comment:
   `pip show` would be a better way.
   
   
   substring searches would confuse `tensorflow` for `tensorflow-metadata` for 
example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to