tvalentyn commented on code in PR #27436:
URL: https://github.com/apache/beam/pull/27436#discussion_r1275186021
##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
'--sdk_location',
default='default',
help=(
- 'Override the default location from where the Beam SDK is '
- 'downloaded. It can be a URL, a GCS path, or a local path to an '
+ 'Override the default location of the beam SDK. '
Review Comment:
```suggestion
'Path to a custom Beam SDK package to install and use on the
runner. '
```
##########
CHANGES.md:
##########
@@ -72,6 +72,7 @@
## Breaking Changes
* Legacy runner support removed from Dataflow, all pipelines must use runner
v2.
+* [Python]Beam SDK will not be staged from PyPI in the --staging_location for
DataflowRunner. If the custom container doesn't include Apache Beam, it will no
longer be installed using the staged Beam
SDK.([#26996](https://github.com/apache/beam/issues/26996))
Review Comment:
```suggestion
* [Python] Dataflow Runner will no longer stage Beam SDK from PyPI in the
`--staging_location` at pipeline submission. Custom container images that are
not based on Beam's default image must include Apache Beam
installation.([#26996](https://github.com/apache/beam/issues/26996))
```
##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
'--sdk_location',
default='default',
help=(
- 'Override the default location from where the Beam SDK is '
- 'downloaded. It can be a URL, a GCS path, or a local path to an '
+ 'Override the default location of the beam SDK. '
+ 'It can be a URL, a GCS path, or a local path to an '
'SDK tarball. Workflow submissions will download or copy an SDK '
Review Comment:
Use this flag when running pipelines with an unreleased or manually patched
version of Beam SDK.
##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1284,11 +1284,11 @@ def _add_argparse_args(cls, parser):
'--sdk_location',
default='default',
help=(
- 'Override the default location from where the Beam SDK is '
- 'downloaded. It can be a URL, a GCS path, or a local path to an '
+ 'Override the default location of the beam SDK. '
+ 'It can be a URL, a GCS path, or a local path to an '
'SDK tarball. Workflow submissions will download or copy an SDK '
- 'tarball from here. If set to the string "default", a standard '
- 'SDK location is used. If empty, no SDK is copied.'))
+ 'tarball from here. If set to the string "default", '
+ 'the beam SDK in the default container will be used.'))
Review Comment:
If set to the string "default", runners will use the SDK provided in the
default environment.
##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string,
requirementsFiles []st
log.Printf("Failed to setup acceptable wheel specs, leave it as
empty: %v", err)
}
+ pkgName := "apache-beam"
+ isSdkInstalled, err := isPackageInstalled(pkgName)
+ if err != nil {
+ return fmt.Errorf("failed to check if Apache Beam %s is
installed: %v", pkgName, err)
+ }
+
+ if !isSdkInstalled {
Review Comment:
this should actually be an error.
##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string,
requirementsFiles []st
log.Printf("Failed to setup acceptable wheel specs, leave it as
empty: %v", err)
}
+ pkgName := "apache-beam"
+ isSdkInstalled, err := isPackageInstalled(pkgName)
+ if err != nil {
+ return fmt.Errorf("failed to check if Apache Beam %s is
installed: %v", pkgName, err)
+ }
+
+ if !isSdkInstalled {
+ log.Printf("Apache Beam is not installed on the custom
container. Please make sure Apache Beam is installed in the custom container.
Look at the docummentation
https://beam.apache.org/documentation/runtime/environments/ on custom
containers for more details.")
Review Comment:
wording suggestion: "Apache Beam is not installed in the runtime
environment. If you use a custom container image, you must install apache-beam
package in the custom image using same version of Beam as in the pipeline
submission environment. For more information, see: the
https://beam.apache.org/documentation/runtime/environments/ page."
##########
sdks/python/container/boot.go:
##########
@@ -371,6 +371,16 @@ func installSetupPackages(files []string, workDir string,
requirementsFiles []st
log.Printf("Failed to setup acceptable wheel specs, leave it as
empty: %v", err)
}
+ pkgName := "apache-beam"
+ isSdkInstalled, err := isPackageInstalled(pkgName)
+ if err != nil {
+ return fmt.Errorf("failed to check if Apache Beam %s is
installed: %v", pkgName, err)
Review Comment:
this shouldn't be a hard error, we can attempt to continue.
##########
sdks/python/apache_beam/runners/portability/stager.py:
##########
@@ -296,30 +290,24 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.extra_packages, temp_dir=temp_dir))
if hasattr(setup_options, 'sdk_location'):
-
- if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
- setup_options.sdk_location):
- # If --sdk_location is not specified then the appropriate package
- # will be obtained from PyPI (https://pypi.python.org) based on the
- # version of the currently running SDK. If the option is
- # present then no version matching is made and the exact URL or path
- # is expected.
- #
- # Unit tests running in the 'python setup.py test' context will
- # not have the sdk_location attribute present and therefore we
- # will not stage SDK.
- sdk_remote_location = 'pypi' if (
- setup_options.sdk_location == 'default'
- ) else setup_options.sdk_location
- resources.extend(
- Stager._create_beam_sdk(sdk_remote_location, temp_dir))
- elif setup_options.sdk_location == 'container':
- # Use the SDK that's built into the container, rather than re-staging
- # it.
+ sdk_location = setup_options.sdk_location
+ # check if it is remote location
+ if Stager._is_remote_path(sdk_location):
+ try:
+ resources.extend(
+ Stager._create_beam_sdk(
+ sdk_remote_location=setup_options.sdk_location,
+ temp_dir=temp_dir,
+ ))
+ except:
+ raise RuntimeError(
+ 'The --sdk_location option was used with an unsupported '
+ 'type of location: %s' % sdk_location)
+
+ elif sdk_location == 'default' or sdk_location == 'container':
Review Comment:
```
elif sdk_location == 'default':
# Use default location for a runner.
pass
elif sdk_location == 'container':
# Used in the past to indicate that SDK should be used from container
image instead of being staged.
# Equivalent to 'default' now, leaving for backwards compatibility
reasons.
pass
```
##########
sdks/python/container/piputil.go:
##########
@@ -52,6 +53,23 @@ func pipInstallRequirements(files []string, dir, name
string) error {
return nil
}
+
+func isPackageInstalled(pkgName string) (bool, error) {
Review Comment:
`pip show` would be a better way.
substring searches would confuse `tensorflow` for `tensorflow-metadata` for
example.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]