This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2525bca68 Add buffered logger to the Python bootloader (#28317)
8b2525bca68 is described below

commit 8b2525bca682162ebb5ad516c98e344dd598dc81
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu Sep 7 10:02:30 2023 -0400

    Add buffered logger to the Python bootloader (#28317)
    
    * [WIP] Add buffered logging to the Python Bootloader
    
    * Take pip out of quiet 1
    
    * Reroute Execute fns to new ExecuteEnvWithIO
---
 sdks/go/container/tools/buffered_logging.go |  4 +--
 sdks/go/pkg/beam/util/execx/exec.go         | 15 +++++---
 sdks/python/container/boot.go               | 16 ++++-----
 sdks/python/container/piputil.go            | 53 +++++++++++++++++++++--------
 4 files changed, 60 insertions(+), 28 deletions(-)

diff --git a/sdks/go/container/tools/buffered_logging.go 
b/sdks/go/container/tools/buffered_logging.go
index 5a810dbfdf1..ef5e8310c3b 100644
--- a/sdks/go/container/tools/buffered_logging.go
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -21,7 +21,7 @@ import (
        "strings"
 )
 
-const INITIAL_LOG_SIZE int = 255
+const initialLogSize int = 255
 
 // BufferedLogger is a wrapper around the FnAPI logging client meant to be used
 // in place of stdout and stderr in bootloader subprocesses. Not intended for
@@ -46,7 +46,7 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
        }
        n, err := b.builder.Write(p)
        if b.logs == nil {
-               b.logs = make([]string, 0, INITIAL_LOG_SIZE)
+               b.logs = make([]string, 0, initialLogSize)
        }
        b.logs = append(b.logs, b.builder.String())
        b.builder.Reset()
diff --git a/sdks/go/pkg/beam/util/execx/exec.go 
b/sdks/go/pkg/beam/util/execx/exec.go
index 455b5f5ff84..aaaf9355e7c 100644
--- a/sdks/go/pkg/beam/util/execx/exec.go
+++ b/sdks/go/pkg/beam/util/execx/exec.go
@@ -17,6 +17,7 @@
 package execx
 
 import (
+       "io"
        "os"
        "os/exec"
 )
@@ -24,16 +25,22 @@ import (
 // Execute runs the program with the given arguments. It attaches stdio to the
 // child process.
 func Execute(prog string, args ...string) error {
-       return ExecuteEnv(nil, prog, args...)
+       return ExecuteEnvWithIO(nil, os.Stdin, os.Stdout, os.Stderr, prog, 
args...)
 }
 
 // ExecuteEnv runs the program with the given arguments with additional 
environment
 // variables. It attaches stdio to the child process.
 func ExecuteEnv(env map[string]string, prog string, args ...string) error {
+       return ExecuteEnvWithIO(env, os.Stdin, os.Stdout, os.Stderr, prog, 
args...)
+}
+
+// ExecuteEnvWithIO runs the program with the given arguments with additional 
environment
+// variables. It attaches custom IO to the child process.
+func ExecuteEnvWithIO(env map[string]string, stdin io.Reader, stdout, stderr 
io.Writer, prog string, args ...string) error {
        cmd := exec.Command(prog, args...)
-       cmd.Stdin = os.Stdin
-       cmd.Stdout = os.Stdout
-       cmd.Stderr = os.Stderr
+       cmd.Stdin = stdin
+       cmd.Stdout = stdout
+       cmd.Stderr = stderr
        if env != nil {
                cmd.Env = os.Environ()
                for k, v := range env {
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index e7b11daa397..286490c2445 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -205,7 +205,7 @@ func launchSDKProcess() error {
                }
        }
 
-       if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); 
setupErr != nil {
+       if setupErr := installSetupPackages(ctx, logger, fileNames, dir, 
requirementsFiles); setupErr != nil {
                fmtErr := fmt.Errorf("failed to install required packages: %v", 
setupErr)
                // Send error message to logging service before returning up 
the call stack
                logger.Errorf(ctx, fmtErr.Error())
@@ -379,7 +379,7 @@ func setupAcceptableWheelSpecs() error {
 }
 
 // installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(files []string, workDir string, requirementsFiles 
[]string) error {
+func installSetupPackages(ctx context.Context, logger *tools.Logger, files 
[]string, workDir string, requirementsFiles []string) error {
        log.Printf("Installing setup packages ...")
 
        if err := setupAcceptableWheelSpecs(); err != nil {
@@ -389,25 +389,25 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
        pkgName := "apache-beam"
        isSdkInstalled := isPackageInstalled(pkgName)
        if !isSdkInstalled {
-               return fmt.Errorf("Apache Beam is not installed in the runtime 
environment. If you use a custom container image, you must install apache-beam 
package in the custom image using same version of Beam as in the pipeline 
submission environment. For more information, see: the 
https://beam.apache.org/documentation/runtime/environments/.";)
+               return fmt.Errorf("Apache Beam is not installed in the runtime 
environment. If you use a custom container image, you must install apache-beam 
package in the custom image using same version of Beam as in the pipeline 
submission environment. For more information, see: the 
https://beam.apache.org/documentation/runtime/environments/";)
        }
        // Install the Dataflow Python SDK and worker packages.
        // We install the extra requirements in case of using the beam sdk. 
These are ignored by pip
        // if the user is using an SDK that does not provide these.
-       if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs, 
false); err != nil {
+       if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, 
acceptableWhlSpecs, false); err != nil {
                return fmt.Errorf("failed to install SDK: %v", err)
        }
        // The staged files will not disappear due to restarts because workDir 
is a
        // folder that is mapped to the host (and therefore survives restarts).
        for _, f := range requirementsFiles {
-               if err := pipInstallRequirements(files, workDir, f); err != nil 
{
+               if err := pipInstallRequirements(ctx, logger, files, workDir, 
f); err != nil {
                        return fmt.Errorf("failed to install requirements: %v", 
err)
                }
        }
-       if err := installExtraPackages(files, extraPackagesFile, workDir); err 
!= nil {
+       if err := installExtraPackages(ctx, logger, files, extraPackagesFile, 
workDir); err != nil {
                return fmt.Errorf("failed to install extra packages: %v", err)
        }
-       if err := pipInstallPackage(files, workDir, workflowFile, false, true, 
nil); err != nil {
+       if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, 
false, true, nil); err != nil {
                return fmt.Errorf("failed to install workflow: %v", err)
        }
 
@@ -450,7 +450,7 @@ func processArtifactsInSetupOnlyMode() {
                }
                files[i] = filePayload.GetPath()
        }
-       if setupErr := installSetupPackages(files, workDir, 
[]string{requirementsFile}); setupErr != nil {
+       if setupErr := installSetupPackages(context.Background(), nil, files, 
workDir, []string{requirementsFile}); setupErr != nil {
                log.Fatalf("Failed to install required packages: %v", setupErr)
        }
 }
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 350bda049d9..fec5cf0ab50 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -18,6 +18,7 @@ package main
 import (
        "bufio"
        "bytes"
+       "context"
        "errors"
        "fmt"
        "log"
@@ -26,16 +27,18 @@ import (
        "path/filepath"
        "strings"
 
+       "github.com/apache/beam/sdks/v2/go/container/tools"
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
 // pipInstallRequirements installs the given requirement, if present.
-func pipInstallRequirements(files []string, dir, name string) error {
+func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files 
[]string, dir, name string) error {
        pythonVersion, err := expansionx.GetPythonVersion()
        if err != nil {
                return err
        }
+       bufLogger := tools.NewBufferedLogger(logger)
        for _, file := range files {
                if file == name {
                        // We run the install process in two rounds in order to 
avoid as much
@@ -50,7 +53,13 @@ func pipInstallRequirements(files []string, dir, name 
string) error {
                        // also installs dependencies. The key is that if all 
the packages have
                        // been installed in the first round then this command 
will be a no-op.
                        args = []string{"-m", "pip", "install", "-q", "-r", 
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", 
"--find-links", dir}
-                       return execx.Execute(pythonVersion, args...)
+                       err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, 
bufLogger, pythonVersion, args...)
+                       if err != nil {
+                               bufLogger.FlushAtError(ctx)
+                               return err
+                       }
+                       bufLogger.FlushAtDebug(ctx)
+                       return nil
                }
        }
        return nil
@@ -69,11 +78,12 @@ func isPackageInstalled(pkgName string) bool {
 }
 
 // pipInstallPackage installs the given package, if present.
-func pipInstallPackage(files []string, dir, name string, force, optional bool, 
extras []string) error {
+func pipInstallPackage(ctx context.Context, logger *tools.Logger, files 
[]string, dir, name string, force, optional bool, extras []string) error {
        pythonVersion, err := expansionx.GetPythonVersion()
        if err != nil {
                return err
        }
+       bufLogger := tools.NewBufferedLogger(logger)
        for _, file := range files {
                if file == name {
                        var packageSpec = name
@@ -97,19 +107,34 @@ func pipInstallPackage(files []string, dir, name string, 
force, optional bool, e
                                // installed version will match the package 
specified, the package itself
                                // will not be reinstalled, but its 
dependencies will now be resolved and
                                // installed if necessary.  This achieves our 
goal outlined above.
-                               args := []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", "--upgrade", 
"--force-reinstall", "--no-deps",
+                               args := []string{"-m", "pip", "install", 
"--no-cache-dir", "--disable-pip-version-check", "--upgrade", 
"--force-reinstall", "--no-deps",
                                        filepath.Join(dir, packageSpec)}
-                               err := execx.Execute(pythonVersion, args...)
+                               err := execx.ExecuteEnvWithIO(nil, os.Stdin, 
bufLogger, bufLogger, pythonVersion, args...)
                                if err != nil {
+                                       bufLogger.FlushAtError(ctx)
                                        return err
+                               } else {
+                                       bufLogger.FlushAtDebug(ctx)
                                }
-                               args = []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
-                               return execx.Execute(pythonVersion, args...)
+                               args = []string{"-m", "pip", "install", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
+                               err = execx.ExecuteEnvWithIO(nil, os.Stdin, 
bufLogger, bufLogger, pythonVersion, args...)
+                               if err != nil {
+                                       bufLogger.FlushAtError(ctx)
+                                       return err
+                               }
+                               bufLogger.FlushAtDebug(ctx)
+                               return nil
                        }
 
                        // Case when we do not perform a forced reinstall.
-                       args := []string{"-m", "pip", "install", "-q", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
-                       return execx.Execute(pythonVersion, args...)
+                       args := []string{"-m", "pip", "install", 
"--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, 
packageSpec)}
+                       err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, 
bufLogger, pythonVersion, args...)
+                       if err != nil {
+                               bufLogger.FlushAtError(ctx)
+                               return err
+                       }
+                       bufLogger.FlushAtDebug(ctx)
+                       return nil
                }
        }
        if optional {
@@ -120,7 +145,7 @@ func pipInstallPackage(files []string, dir, name string, 
force, optional bool, e
 
 // installExtraPackages installs all the packages declared in the extra
 // packages manifest file.
-func installExtraPackages(files []string, extraPackagesFile, dir string) error 
{
+func installExtraPackages(ctx context.Context, logger *tools.Logger, files 
[]string, extraPackagesFile, dir string) error {
        // First check that extra packages manifest file is present.
        for _, file := range files {
                if file != extraPackagesFile {
@@ -139,7 +164,7 @@ func installExtraPackages(files []string, 
extraPackagesFile, dir string) error {
                for s.Scan() {
                        extraPackage := s.Text()
                        log.Printf("Installing extra package: %s", extraPackage)
-                       if err = pipInstallPackage(files, dir, extraPackage, 
true, false, nil); err != nil {
+                       if err = pipInstallPackage(ctx, logger, files, dir, 
extraPackage, true, false, nil); err != nil {
                                return fmt.Errorf("failed to install extra 
package %s: %v", extraPackage, err)
                        }
                }
@@ -167,13 +192,13 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs 
[]string) string {
 // assume that the pipleine was started with the Beam SDK found in the wheel
 // file, and we try to install it. If not successful, we fall back to 
installing
 // SDK from source tarball provided in sdkSrcFile.
-func installSdk(files []string, workDir string, sdkSrcFile string, 
acceptableWhlSpecs []string, required bool) error {
+func installSdk(ctx context.Context, logger *tools.Logger, files []string, 
workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) 
error {
        sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
 
        if sdkWhlFile != "" {
                // by default, pip rejects to install wheel if same version 
already installed
                isDev := strings.Contains(sdkWhlFile, ".dev")
-               err := pipInstallPackage(files, workDir, sdkWhlFile, isDev, 
false, []string{"gcp"})
+               err := pipInstallPackage(ctx, logger, files, workDir, 
sdkWhlFile, isDev, false, []string{"gcp"})
                if err == nil {
                        return nil
                }
@@ -185,6 +210,6 @@ func installSdk(files []string, workDir string, sdkSrcFile 
string, acceptableWhl
                        return nil
                }
        }
-       err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, 
[]string{"gcp"})
+       err := pipInstallPackage(ctx, logger, files, workDir, sdkSrcFile, 
false, false, []string{"gcp"})
        return err
 }

Reply via email to