damccorm commented on code in PR #27236:
URL: https://github.com/apache/beam/pull/27236#discussion_r1261215076
##########
sdks/python/container/boot.go:
##########
@@ -91,12 +92,11 @@ func main() {
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId,
strings.Join(args, " "))
- if err := execx.Execute("python", args...); err != nil {
- log.Fatalf("Python SDK worker pool exited with error:
%v", err)
+ if pythonVersion, err := expansionx.getPythonVersion(); err !=
nil {
+ log.Print("Python SDK worker pool exited.")
+ os.Exit(0)
}
- log.Print("Python SDK worker pool exited.")
- os.Exit(0)
- }
+ log.Fatalf("Python SDK worker pool exited with error: %v", err)
Review Comment:
Looks like we're missing a closing curly brace here
##########
sdks/python/container/piputil.go:
##########
@@ -38,21 +38,31 @@ func pipInstallRequirements(files []string, dir, name
string) error {
// option will make sure that only things staged in the
worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute("python", args...); err != nil {
+ pythonVersion, err := expansionx.getPythonVersion()
+ if err != nil {
fmt.Println("Some packages could not be
installed solely from the requirements cache. Installing packages from PyPI.")
+ return fmt.Errorf("Python interpreter is not
available: %v", err)
}
// The second install round opens up the search for
packages on PyPI and
// also installs dependencies. The key is that if all
the packages have
// been installed in the first round then this command
will be a no-op.
args = []string{"-m", "pip", "install", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--find-links", dir}
- return execx.Execute("python", args...)
+ if err := execx.Execute(pythonVersion, args...); err !=
nil {
+ return fmt.Errorf("Could not start the
expansion service: %v", err)
+ }
+ return nil
+
}
}
return nil
}
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool,
extras []string) error {
+ pythonVersion, err := expansionx.getPythonVersion()
+ if err != nil {
+ return fmt.Errorf("python interpreter is not available.")
+ }
Review Comment:
I think having a single pythonVersion check at the start of a function is a
cleaner way of doing this, could we just do this in the other functions?
##########
sdks/python/container/piputil.go:
##########
@@ -38,21 +38,31 @@ func pipInstallRequirements(files []string, dir, name
string) error {
// option will make sure that only things staged in the
worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute("python", args...); err != nil {
+ pythonVersion, err := expansionx.getPythonVersion()
+ if err != nil {
fmt.Println("Some packages could not be
installed solely from the requirements cache. Installing packages from PyPI.")
+ return fmt.Errorf("Python interpreter is not
available: %v", err)
}
// The second install round opens up the search for
packages on PyPI and
// also installs dependencies. The key is that if all
the packages have
// been installed in the first round then this command
will be a no-op.
args = []string{"-m", "pip", "install", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--find-links", dir}
- return execx.Execute("python", args...)
+ if err := execx.Execute(pythonVersion, args...); err !=
nil {
+ return fmt.Errorf("Could not start the
expansion service: %v", err)
+ }
+ return nil
Review Comment:
This can be simplified to `return execx.Execute(pythonVersion, args...)`
##########
sdks/python/container/boot.go:
##########
@@ -91,12 +92,11 @@ func main() {
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId,
strings.Join(args, " "))
- if err := execx.Execute("python", args...); err != nil {
- log.Fatalf("Python SDK worker pool exited with error:
%v", err)
+ if pythonVersion, err := expansionx.getPythonVersion(); err !=
nil {
+ log.Print("Python SDK worker pool exited.")
+ os.Exit(0)
}
Review Comment:
We should be calling `execx.Execute(pythonVersion, args...)` here, right?
##########
sdks/python/container/piputil.go:
##########
@@ -38,21 +38,31 @@ func pipInstallRequirements(files []string, dir, name
string) error {
// option will make sure that only things staged in the
worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute("python", args...); err != nil {
+ pythonVersion, err := expansionx.getPythonVersion()
+ if err != nil {
fmt.Println("Some packages could not be
installed solely from the requirements cache. Installing packages from PyPI.")
Review Comment:
This warning should come after calling `execx.Execute(pythonVersion,
args...)`, right? I don't think that line should've been totally removed, it
should come after this err check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]