This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2525bca68 Add buffered logger to the Python bootloader (#28317)
8b2525bca68 is described below
commit 8b2525bca682162ebb5ad516c98e344dd598dc81
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu Sep 7 10:02:30 2023 -0400
Add buffered logger to the Python bootloader (#28317)
* [WIP] Add buffered logging to the Python Bootloader
* Take pip out of quiet 1
* Reroute Execute fns to new ExecuteEnvWithIO
---
sdks/go/container/tools/buffered_logging.go | 4 +--
sdks/go/pkg/beam/util/execx/exec.go | 15 +++++---
sdks/python/container/boot.go | 16 ++++-----
sdks/python/container/piputil.go | 53 +++++++++++++++++++++--------
4 files changed, 60 insertions(+), 28 deletions(-)
diff --git a/sdks/go/container/tools/buffered_logging.go
b/sdks/go/container/tools/buffered_logging.go
index 5a810dbfdf1..ef5e8310c3b 100644
--- a/sdks/go/container/tools/buffered_logging.go
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -21,7 +21,7 @@ import (
"strings"
)
-const INITIAL_LOG_SIZE int = 255
+const initialLogSize int = 255
// BufferedLogger is a wrapper around the FnAPI logging client meant to be used
// in place of stdout and stderr in bootloader subprocesses. Not intended for
@@ -46,7 +46,7 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
}
n, err := b.builder.Write(p)
if b.logs == nil {
- b.logs = make([]string, 0, INITIAL_LOG_SIZE)
+ b.logs = make([]string, 0, initialLogSize)
}
b.logs = append(b.logs, b.builder.String())
b.builder.Reset()
diff --git a/sdks/go/pkg/beam/util/execx/exec.go
b/sdks/go/pkg/beam/util/execx/exec.go
index 455b5f5ff84..aaaf9355e7c 100644
--- a/sdks/go/pkg/beam/util/execx/exec.go
+++ b/sdks/go/pkg/beam/util/execx/exec.go
@@ -17,6 +17,7 @@
package execx
import (
+ "io"
"os"
"os/exec"
)
@@ -24,16 +25,22 @@ import (
// Execute runs the program with the given arguments. It attaches stdio to the
// child process.
func Execute(prog string, args ...string) error {
- return ExecuteEnv(nil, prog, args...)
+ return ExecuteEnvWithIO(nil, os.Stdin, os.Stdout, os.Stderr, prog,
args...)
}
// ExecuteEnv runs the program with the given arguments with additional
environment
// variables. It attaches stdio to the child process.
func ExecuteEnv(env map[string]string, prog string, args ...string) error {
+ return ExecuteEnvWithIO(env, os.Stdin, os.Stdout, os.Stderr, prog,
args...)
+}
+
+// ExecuteEnvWithIO runs the program with the given arguments with additional
environment
+// variables. It attaches custom IO to the child process.
+func ExecuteEnvWithIO(env map[string]string, stdin io.Reader, stdout, stderr
io.Writer, prog string, args ...string) error {
cmd := exec.Command(prog, args...)
- cmd.Stdin = os.Stdin
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
+ cmd.Stdin = stdin
+ cmd.Stdout = stdout
+ cmd.Stderr = stderr
if env != nil {
cmd.Env = os.Environ()
for k, v := range env {
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index e7b11daa397..286490c2445 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -205,7 +205,7 @@ func launchSDKProcess() error {
}
}
- if setupErr := installSetupPackages(fileNames, dir, requirementsFiles);
setupErr != nil {
+ if setupErr := installSetupPackages(ctx, logger, fileNames, dir,
requirementsFiles); setupErr != nil {
fmtErr := fmt.Errorf("failed to install required packages: %v",
setupErr)
// Send error message to logging service before returning up
the call stack
logger.Errorf(ctx, fmtErr.Error())
@@ -379,7 +379,7 @@ func setupAcceptableWheelSpecs() error {
}
// installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(files []string, workDir string, requirementsFiles
[]string) error {
+func installSetupPackages(ctx context.Context, logger *tools.Logger, files
[]string, workDir string, requirementsFiles []string) error {
log.Printf("Installing setup packages ...")
if err := setupAcceptableWheelSpecs(); err != nil {
@@ -389,25 +389,25 @@ func installSetupPackages(files []string, workDir string,
requirementsFiles []st
pkgName := "apache-beam"
isSdkInstalled := isPackageInstalled(pkgName)
if !isSdkInstalled {
- return fmt.Errorf("Apache Beam is not installed in the runtime
environment. If you use a custom container image, you must install apache-beam
package in the custom image using same version of Beam as in the pipeline
submission environment. For more information, see: the
https://beam.apache.org/documentation/runtime/environments/.")
+ return fmt.Errorf("Apache Beam is not installed in the runtime
environment. If you use a custom container image, you must install apache-beam
package in the custom image using same version of Beam as in the pipeline
submission environment. For more information, see: the
https://beam.apache.org/documentation/runtime/environments/")
}
// Install the Dataflow Python SDK and worker packages.
// We install the extra requirements in case of using the beam sdk.
These are ignored by pip
// if the user is using an SDK that does not provide these.
- if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs,
false); err != nil {
+ if err := installSdk(ctx, logger, files, workDir, sdkSrcFile,
acceptableWhlSpecs, false); err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
}
// The staged files will not disappear due to restarts because workDir
is a
// folder that is mapped to the host (and therefore survives restarts).
for _, f := range requirementsFiles {
- if err := pipInstallRequirements(files, workDir, f); err != nil
{
+ if err := pipInstallRequirements(ctx, logger, files, workDir,
f); err != nil {
return fmt.Errorf("failed to install requirements: %v",
err)
}
}
- if err := installExtraPackages(files, extraPackagesFile, workDir); err
!= nil {
+ if err := installExtraPackages(ctx, logger, files, extraPackagesFile,
workDir); err != nil {
return fmt.Errorf("failed to install extra packages: %v", err)
}
- if err := pipInstallPackage(files, workDir, workflowFile, false, true,
nil); err != nil {
+ if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile,
false, true, nil); err != nil {
return fmt.Errorf("failed to install workflow: %v", err)
}
@@ -450,7 +450,7 @@ func processArtifactsInSetupOnlyMode() {
}
files[i] = filePayload.GetPath()
}
- if setupErr := installSetupPackages(files, workDir,
[]string{requirementsFile}); setupErr != nil {
+ if setupErr := installSetupPackages(context.Background(), nil, files,
workDir, []string{requirementsFile}); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
}
}
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 350bda049d9..fec5cf0ab50 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -18,6 +18,7 @@ package main
import (
"bufio"
"bytes"
+ "context"
"errors"
"fmt"
"log"
@@ -26,16 +27,18 @@ import (
"path/filepath"
"strings"
+ "github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)
// pipInstallRequirements installs the given requirement, if present.
-func pipInstallRequirements(files []string, dir, name string) error {
+func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files
[]string, dir, name string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
+ bufLogger := tools.NewBufferedLogger(logger)
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to
avoid as much
@@ -50,7 +53,13 @@ func pipInstallRequirements(files []string, dir, name
string) error {
// also installs dependencies. The key is that if all
the packages have
// been installed in the first round then this command
will be a no-op.
args = []string{"-m", "pip", "install", "-q", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--find-links", dir}
- return execx.Execute(pythonVersion, args...)
+ err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger,
bufLogger, pythonVersion, args...)
+ if err != nil {
+ bufLogger.FlushAtError(ctx)
+ return err
+ }
+ bufLogger.FlushAtDebug(ctx)
+ return nil
}
}
return nil
@@ -69,11 +78,12 @@ func isPackageInstalled(pkgName string) bool {
}
// pipInstallPackage installs the given package, if present.
-func pipInstallPackage(files []string, dir, name string, force, optional bool,
extras []string) error {
+func pipInstallPackage(ctx context.Context, logger *tools.Logger, files
[]string, dir, name string, force, optional bool, extras []string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
+ bufLogger := tools.NewBufferedLogger(logger)
for _, file := range files {
if file == name {
var packageSpec = name
@@ -97,19 +107,34 @@ func pipInstallPackage(files []string, dir, name string,
force, optional bool, e
// installed version will match the package
specified, the package itself
// will not be reinstalled, but its
dependencies will now be resolved and
// installed if necessary. This achieves our
goal outlined above.
- args := []string{"-m", "pip", "install", "-q",
"--no-cache-dir", "--disable-pip-version-check", "--upgrade",
"--force-reinstall", "--no-deps",
+ args := []string{"-m", "pip", "install",
"--no-cache-dir", "--disable-pip-version-check", "--upgrade",
"--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
- err := execx.Execute(pythonVersion, args...)
+ err := execx.ExecuteEnvWithIO(nil, os.Stdin,
bufLogger, bufLogger, pythonVersion, args...)
if err != nil {
+ bufLogger.FlushAtError(ctx)
return err
+ } else {
+ bufLogger.FlushAtDebug(ctx)
}
- args = []string{"-m", "pip", "install", "-q",
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir,
packageSpec)}
- return execx.Execute(pythonVersion, args...)
+ args = []string{"-m", "pip", "install",
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir,
packageSpec)}
+ err = execx.ExecuteEnvWithIO(nil, os.Stdin,
bufLogger, bufLogger, pythonVersion, args...)
+ if err != nil {
+ bufLogger.FlushAtError(ctx)
+ return err
+ }
+ bufLogger.FlushAtDebug(ctx)
+ return nil
}
// Case when we do not perform a forced reinstall.
- args := []string{"-m", "pip", "install", "-q",
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir,
packageSpec)}
- return execx.Execute(pythonVersion, args...)
+ args := []string{"-m", "pip", "install",
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir,
packageSpec)}
+ err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger,
bufLogger, pythonVersion, args...)
+ if err != nil {
+ bufLogger.FlushAtError(ctx)
+ return err
+ }
+ bufLogger.FlushAtDebug(ctx)
+ return nil
}
}
if optional {
@@ -120,7 +145,7 @@ func pipInstallPackage(files []string, dir, name string,
force, optional bool, e
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
-func installExtraPackages(files []string, extraPackagesFile, dir string) error
{
+func installExtraPackages(ctx context.Context, logger *tools.Logger, files
[]string, extraPackagesFile, dir string) error {
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
@@ -139,7 +164,7 @@ func installExtraPackages(files []string,
extraPackagesFile, dir string) error {
for s.Scan() {
extraPackage := s.Text()
log.Printf("Installing extra package: %s", extraPackage)
- if err = pipInstallPackage(files, dir, extraPackage,
true, false, nil); err != nil {
+ if err = pipInstallPackage(ctx, logger, files, dir,
extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra
package %s: %v", extraPackage, err)
}
}
@@ -167,13 +192,13 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs
[]string) string {
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to
installing
// SDK from source tarball provided in sdkSrcFile.
-func installSdk(files []string, workDir string, sdkSrcFile string,
acceptableWhlSpecs []string, required bool) error {
+func installSdk(ctx context.Context, logger *tools.Logger, files []string,
workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool)
error {
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
if sdkWhlFile != "" {
// by default, pip rejects to install wheel if same version
already installed
isDev := strings.Contains(sdkWhlFile, ".dev")
- err := pipInstallPackage(files, workDir, sdkWhlFile, isDev,
false, []string{"gcp"})
+ err := pipInstallPackage(ctx, logger, files, workDir,
sdkWhlFile, isDev, false, []string{"gcp"})
if err == nil {
return nil
}
@@ -185,6 +210,6 @@ func installSdk(files []string, workDir string, sdkSrcFile
string, acceptableWhl
return nil
}
}
- err := pipInstallPackage(files, workDir, sdkSrcFile, false, false,
[]string{"gcp"})
+ err := pipInstallPackage(ctx, logger, files, workDir, sdkSrcFile,
false, false, []string{"gcp"})
return err
}