This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ddcf5875730 Route remaining prints in boot.go through logging service 
if possible, add periodic flush for PIP invocations (#28413)
ddcf5875730 is described below

commit ddcf58757302401a1f714eb2167069b13ef1645a
Author: Jack McCluskey <[email protected]>
AuthorDate: Wed Sep 13 10:09:05 2023 -0400

    Route remaining prints in boot.go through logging service if possible, add 
periodic flush for PIP invocations (#28413)
    
    * Route remaining prints in boot.go through logging service if possible
    
    * Add periodic flush option for pip invocation
    
    * Reset lastFlush properly
    
    * Address comments
    
    * Set interval to 15 seconds
---
 sdks/go/container/tools/buffered_logging.go      | 36 ++++++++++--
 sdks/go/container/tools/buffered_logging_test.go | 72 ++++++++++++++++++++++++
 sdks/python/container/boot.go                    |  5 +-
 sdks/python/container/piputil.go                 | 22 +++++---
 4 files changed, 120 insertions(+), 15 deletions(-)

diff --git a/sdks/go/container/tools/buffered_logging.go 
b/sdks/go/container/tools/buffered_logging.go
index ef5e8310c3b..445d19fabfd 100644
--- a/sdks/go/container/tools/buffered_logging.go
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -17,8 +17,11 @@ package tools
 
 import (
        "context"
+       "log"
+       "math"
        "os"
        "strings"
+       "time"
 )
 
 const initialLogSize int = 255
@@ -27,14 +30,24 @@ const initialLogSize int = 255
 // in place of stdout and stderr in bootloader subprocesses. Not intended for
 // Beam end users.
 type BufferedLogger struct {
-       logger  *Logger
-       builder strings.Builder
-       logs    []string
+       logger               *Logger
+       builder              strings.Builder
+       logs                 []string
+       lastFlush            time.Time
+       flushInterval        time.Duration
+       periodicFlushContext context.Context
+       now                  func() time.Time
 }
 
 // NewBufferedLogger returns a new BufferedLogger type by reference.
 func NewBufferedLogger(logger *Logger) *BufferedLogger {
-       return &BufferedLogger{logger: logger}
+       return &BufferedLogger{logger: logger, lastFlush: time.Now(), 
flushInterval: time.Duration(math.MaxInt64), periodicFlushContext: 
context.Background(), now: time.Now}
+}
+
+// NewBufferedLoggerWithFlushInterval returns a new BufferedLogger type by 
reference. This type will
+// flush logs periodically on Write() calls as well as when Flush*() functions 
are called.
+func NewBufferedLoggerWithFlushInterval(ctx context.Context, logger *Logger, 
interval time.Duration) *BufferedLogger {
+       return &BufferedLogger{logger: logger, lastFlush: time.Now(), 
flushInterval: interval, periodicFlushContext: ctx, now: time.Now}
 }
 
 // Write implements the io.Writer interface, converting input to a string
@@ -50,6 +63,9 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
        }
        b.logs = append(b.logs, b.builder.String())
        b.builder.Reset()
+       if b.now().Sub(b.lastFlush) > b.flushInterval {
+               b.FlushAtDebug(b.periodicFlushContext)
+       }
        return n, err
 }
 
@@ -63,6 +79,7 @@ func (b *BufferedLogger) FlushAtError(ctx context.Context) {
                b.logger.Errorf(ctx, message)
        }
        b.logs = nil
+       b.lastFlush = time.Now()
 }
 
 // FlushAtDebug flushes the contents of the buffer to the logging
@@ -75,4 +92,15 @@ func (b *BufferedLogger) FlushAtDebug(ctx context.Context) {
                b.logger.Printf(ctx, message)
        }
        b.logs = nil
+       b.lastFlush = time.Now()
+}
+
+// Prints directly to the logging service. If the logger is nil, prints 
directly to the
+// console. Used for the container pre-build workflow.
+func (b *BufferedLogger) Printf(ctx context.Context, format string, args 
...any) {
+       if b.logger == nil {
+               log.Printf(format, args...)
+               return
+       }
+       b.logger.Printf(ctx, format, args...)
 }
diff --git a/sdks/go/container/tools/buffered_logging_test.go 
b/sdks/go/container/tools/buffered_logging_test.go
index 8feef7b413d..9f542d2d5ab 100644
--- a/sdks/go/container/tools/buffered_logging_test.go
+++ b/sdks/go/container/tools/buffered_logging_test.go
@@ -18,6 +18,7 @@ package tools
 import (
        "context"
        "testing"
+       "time"
 
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
 )
@@ -166,4 +167,75 @@ func TestBufferedLogger(t *testing.T) {
                        }
                }
        })
+
+       t.Run("direct print", func(t *testing.T) {
+               catcher := &logCatcher{}
+               l := &Logger{client: catcher}
+               bl := NewBufferedLogger(l)
+
+               bl.Printf(ctx, "foo %v", "bar")
+
+               received := catcher.msgs[0].GetLogEntries()[0]
+
+               if got, want := received.Message, "foo bar"; got != want {
+                       t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message 
%q, want %q", got, want)
+               }
+
+               if got, want := received.Severity, 
fnpb.LogEntry_Severity_DEBUG; got != want {
+                       t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity 
%v, want %v", got, want)
+               }
+       })
+
+       t.Run("debug flush at interval", func(t *testing.T) {
+               catcher := &logCatcher{}
+               l := &Logger{client: catcher}
+               interval := 5 * time.Second
+               bl := NewBufferedLoggerWithFlushInterval(context.Background(), 
l, interval)
+
+               startTime := time.Now()
+               bl.now = func() time.Time { return startTime }
+
+               messages := []string{"foo", "bar"}
+
+               for i, message := range messages {
+                       if i > 1 {
+                               bl.now = func() time.Time { return 
startTime.Add(6 * time.Second) }
+                       }
+                       messBytes := []byte(message)
+                       n, err := bl.Write(messBytes)
+
+                       if err != nil {
+                               t.Errorf("got error %v", err)
+                       }
+                       if got, want := n, len(messBytes); got != want {
+                               t.Errorf("got %d bytes written, want %d", got, 
want)
+                       }
+               }
+
+               lastMessage := "baz"
+               bl.now = func() time.Time { return startTime.Add(6 * 
time.Second) }
+               messBytes := []byte(lastMessage)
+               n, err := bl.Write(messBytes)
+
+               if err != nil {
+                       t.Errorf("got error %v", err)
+               }
+               if got, want := n, len(messBytes); got != want {
+                       t.Errorf("got %d bytes written, want %d", got, want)
+               }
+
+               // Type should have auto-flushed at debug after the third 
message
+               received := catcher.msgs[0].GetLogEntries()
+               messages = append(messages, lastMessage)
+
+               for i, message := range received {
+                       if got, want := message.Message, messages[i]; got != 
want {
+                               t.Errorf("got message %q, want %q", got, want)
+                       }
+
+                       if got, want := message.Severity, 
fnpb.LogEntry_Severity_DEBUG; got != want {
+                               t.Errorf("got severity %v, want %v", got, want)
+                       }
+               }
+       })
 }
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index da3b4a1536e..ded10a44204 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -378,10 +378,11 @@ func setupAcceptableWheelSpecs() error {
 
 // installSetupPackages installs Beam SDK and user dependencies.
 func installSetupPackages(ctx context.Context, logger *tools.Logger, files 
[]string, workDir string, requirementsFiles []string) error {
-       log.Printf("Installing setup packages ...")
+       bufLogger := tools.NewBufferedLogger(logger)
+       bufLogger.Printf(ctx, "Installing setup packages ...")
 
        if err := setupAcceptableWheelSpecs(); err != nil {
-               log.Printf("Failed to setup acceptable wheel specs, leave it as 
empty: %v", err)
+               bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs, 
leave it as empty: %v", err)
        }
 
        pkgName := "apache-beam"
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index fec5cf0ab50..67488bdc39f 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -21,11 +21,11 @@ import (
        "context"
        "errors"
        "fmt"
-       "log"
        "os"
        "os/exec"
        "path/filepath"
        "strings"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/container/tools"
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
@@ -47,7 +47,7 @@ func pipInstallRequirements(ctx context.Context, logger 
*tools.Logger, files []s
                        // used without following their dependencies.
                        args := []string{"-m", "pip", "install", "-q", "-r", 
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", 
"--no-index", "--no-deps", "--find-links", dir}
                        if err := execx.Execute(pythonVersion, args...); err != 
nil {
-                               fmt.Println("Some packages could not be 
installed solely from the requirements cache. Installing packages from PyPI.")
+                               bufLogger.Printf(ctx, "Some packages could not 
be installed solely from the requirements cache. Installing packages from 
PyPI.")
                        }
                        // The second install round opens up the search for 
packages on PyPI and
                        // also installs dependencies. The key is that if all 
the packages have
@@ -77,13 +77,15 @@ func isPackageInstalled(pkgName string) bool {
        return true
 }
 
+const pipLogFlushInterval time.Duration = 15 * time.Second
+
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(ctx context.Context, logger *tools.Logger, files 
[]string, dir, name string, force, optional bool, extras []string) error {
        pythonVersion, err := expansionx.GetPythonVersion()
        if err != nil {
                return err
        }
-       bufLogger := tools.NewBufferedLogger(logger)
+       bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger, 
pipLogFlushInterval)
        for _, file := range files {
                if file == name {
                        var packageSpec = name
@@ -146,6 +148,7 @@ func pipInstallPackage(ctx context.Context, logger 
*tools.Logger, files []string
 // installExtraPackages installs all the packages declared in the extra
 // packages manifest file.
 func installExtraPackages(ctx context.Context, logger *tools.Logger, files 
[]string, extraPackagesFile, dir string) error {
+       bufLogger := tools.NewBufferedLogger(logger)
        // First check that extra packages manifest file is present.
        for _, file := range files {
                if file != extraPackagesFile {
@@ -163,7 +166,7 @@ func installExtraPackages(ctx context.Context, logger 
*tools.Logger, files []str
 
                for s.Scan() {
                        extraPackage := s.Text()
-                       log.Printf("Installing extra package: %s", extraPackage)
+                       bufLogger.Printf(ctx, "Installing extra package: %s", 
extraPackage)
                        if err = pipInstallPackage(ctx, logger, files, dir, 
extraPackage, true, false, nil); err != nil {
                                return fmt.Errorf("failed to install extra 
package %s: %v", extraPackage, err)
                        }
@@ -173,12 +176,13 @@ func installExtraPackages(ctx context.Context, logger 
*tools.Logger, files []str
        return nil
 }
 
-func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
+func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, 
acceptableWhlSpecs []string) string {
+       bufLogger := tools.NewBufferedLogger(logger)
        for _, file := range files {
                if strings.HasPrefix(file, "apache_beam") {
                        for _, s := range acceptableWhlSpecs {
                                if strings.HasSuffix(file, s) {
-                                       log.Printf("Found Apache Beam SDK 
wheel: %v", file)
+                                       bufLogger.Printf(ctx, "Found Apache 
Beam SDK wheel: %v", file)
                                        return file
                                }
                        }
@@ -193,8 +197,8 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs 
[]string) string {
 // file, and we try to install it. If not successful, we fall back to 
installing
 // SDK from source tarball provided in sdkSrcFile.
 func installSdk(ctx context.Context, logger *tools.Logger, files []string, 
workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) 
error {
-       sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
-
+       sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs)
+       bufLogger := tools.NewBufferedLogger(logger)
        if sdkWhlFile != "" {
                // by default, pip rejects to install wheel if same version 
already installed
                isDev := strings.Contains(sdkWhlFile, ".dev")
@@ -202,7 +206,7 @@ func installSdk(ctx context.Context, logger *tools.Logger, 
files []string, workD
                if err == nil {
                        return nil
                }
-               log.Printf("Could not install Apache Beam SDK from a wheel: %v, 
proceeding to install SDK from source tarball.", err)
+               bufLogger.Printf(ctx, "Could not install Apache Beam SDK from a 
wheel: %v, proceeding to install SDK from source tarball.", err)
        }
        if !required {
                _, err := os.Stat(filepath.Join(workDir, sdkSrcFile))

Reply via email to