This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ddcf5875730 Route remaining prints in boot.go through logging service
if possible, add periodic flush for PIP invocations (#28413)
ddcf5875730 is described below
commit ddcf58757302401a1f714eb2167069b13ef1645a
Author: Jack McCluskey <[email protected]>
AuthorDate: Wed Sep 13 10:09:05 2023 -0400
Route remaining prints in boot.go through logging service if possible, add
periodic flush for PIP invocations (#28413)
* Route remaining prints in boot.go through logging service if possible
* Add periodic flush option for pip invocation
* Reset lastFlush properly
* Address comments
* Set interval to 15 seconds
---
sdks/go/container/tools/buffered_logging.go | 36 ++++++++++--
sdks/go/container/tools/buffered_logging_test.go | 72 ++++++++++++++++++++++++
sdks/python/container/boot.go | 5 +-
sdks/python/container/piputil.go | 22 +++++---
4 files changed, 120 insertions(+), 15 deletions(-)
diff --git a/sdks/go/container/tools/buffered_logging.go
b/sdks/go/container/tools/buffered_logging.go
index ef5e8310c3b..445d19fabfd 100644
--- a/sdks/go/container/tools/buffered_logging.go
+++ b/sdks/go/container/tools/buffered_logging.go
@@ -17,8 +17,11 @@ package tools
import (
"context"
+ "log"
+ "math"
"os"
"strings"
+ "time"
)
const initialLogSize int = 255
@@ -27,14 +30,24 @@ const initialLogSize int = 255
// in place of stdout and stderr in bootloader subprocesses. Not intended for
// Beam end users.
type BufferedLogger struct {
- logger *Logger
- builder strings.Builder
- logs []string
+ logger *Logger
+ builder strings.Builder
+ logs []string
+ lastFlush time.Time
+ flushInterval time.Duration
+ periodicFlushContext context.Context
+ now func() time.Time
}
// NewBufferedLogger returns a new BufferedLogger type by reference.
func NewBufferedLogger(logger *Logger) *BufferedLogger {
- return &BufferedLogger{logger: logger}
+ return &BufferedLogger{logger: logger, lastFlush: time.Now(),
flushInterval: time.Duration(math.MaxInt64), periodicFlushContext:
context.Background(), now: time.Now}
+}
+
+// NewBufferedLoggerWithFlushInterval returns a new BufferedLogger type by
reference. This type will
+// flush logs periodically on Write() calls as well as when Flush*() functions
are called.
+func NewBufferedLoggerWithFlushInterval(ctx context.Context, logger *Logger,
interval time.Duration) *BufferedLogger {
+ return &BufferedLogger{logger: logger, lastFlush: time.Now(),
flushInterval: interval, periodicFlushContext: ctx, now: time.Now}
}
// Write implements the io.Writer interface, converting input to a string
@@ -50,6 +63,9 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
}
b.logs = append(b.logs, b.builder.String())
b.builder.Reset()
+ if b.now().Sub(b.lastFlush) > b.flushInterval {
+ b.FlushAtDebug(b.periodicFlushContext)
+ }
return n, err
}
@@ -63,6 +79,7 @@ func (b *BufferedLogger) FlushAtError(ctx context.Context) {
b.logger.Errorf(ctx, message)
}
b.logs = nil
+ b.lastFlush = time.Now()
}
// FlushAtDebug flushes the contents of the buffer to the logging
@@ -75,4 +92,15 @@ func (b *BufferedLogger) FlushAtDebug(ctx context.Context) {
b.logger.Printf(ctx, message)
}
b.logs = nil
+ b.lastFlush = time.Now()
+}
+
+// Prints directly to the logging service. If the logger is nil, prints
directly to the
+// console. Used for the container pre-build workflow.
+func (b *BufferedLogger) Printf(ctx context.Context, format string, args
...any) {
+ if b.logger == nil {
+ log.Printf(format, args...)
+ return
+ }
+ b.logger.Printf(ctx, format, args...)
}
diff --git a/sdks/go/container/tools/buffered_logging_test.go
b/sdks/go/container/tools/buffered_logging_test.go
index 8feef7b413d..9f542d2d5ab 100644
--- a/sdks/go/container/tools/buffered_logging_test.go
+++ b/sdks/go/container/tools/buffered_logging_test.go
@@ -18,6 +18,7 @@ package tools
import (
"context"
"testing"
+ "time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
)
@@ -166,4 +167,75 @@ func TestBufferedLogger(t *testing.T) {
}
}
})
+
+ t.Run("direct print", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ bl := NewBufferedLogger(l)
+
+ bl.Printf(ctx, "foo %v", "bar")
+
+ received := catcher.msgs[0].GetLogEntries()[0]
+
+ if got, want := received.Message, "foo bar"; got != want {
+ t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message
%q, want %q", got, want)
+ }
+
+ if got, want := received.Severity,
fnpb.LogEntry_Severity_DEBUG; got != want {
+ t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity
%v, want %v", got, want)
+ }
+ })
+
+ t.Run("debug flush at interval", func(t *testing.T) {
+ catcher := &logCatcher{}
+ l := &Logger{client: catcher}
+ interval := 5 * time.Second
+ bl := NewBufferedLoggerWithFlushInterval(context.Background(),
l, interval)
+
+ startTime := time.Now()
+ bl.now = func() time.Time { return startTime }
+
+ messages := []string{"foo", "bar"}
+
+ for i, message := range messages {
+ if i > 1 {
+ bl.now = func() time.Time { return
startTime.Add(6 * time.Second) }
+ }
+ messBytes := []byte(message)
+ n, err := bl.Write(messBytes)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(messBytes); got != want {
+ t.Errorf("got %d bytes written, want %d", got,
want)
+ }
+ }
+
+ lastMessage := "baz"
+ bl.now = func() time.Time { return startTime.Add(6 *
time.Second) }
+ messBytes := []byte(lastMessage)
+ n, err := bl.Write(messBytes)
+
+ if err != nil {
+ t.Errorf("got error %v", err)
+ }
+ if got, want := n, len(messBytes); got != want {
+ t.Errorf("got %d bytes written, want %d", got, want)
+ }
+
+ // Type should have auto-flushed at debug after the third
message
+ received := catcher.msgs[0].GetLogEntries()
+ messages = append(messages, lastMessage)
+
+ for i, message := range received {
+ if got, want := message.Message, messages[i]; got !=
want {
+ t.Errorf("got message %q, want %q", got, want)
+ }
+
+ if got, want := message.Severity,
fnpb.LogEntry_Severity_DEBUG; got != want {
+ t.Errorf("got severity %v, want %v", got, want)
+ }
+ }
+ })
}
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index da3b4a1536e..ded10a44204 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -378,10 +378,11 @@ func setupAcceptableWheelSpecs() error {
// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(ctx context.Context, logger *tools.Logger, files
[]string, workDir string, requirementsFiles []string) error {
- log.Printf("Installing setup packages ...")
+ bufLogger := tools.NewBufferedLogger(logger)
+ bufLogger.Printf(ctx, "Installing setup packages ...")
if err := setupAcceptableWheelSpecs(); err != nil {
- log.Printf("Failed to setup acceptable wheel specs, leave it as
empty: %v", err)
+ bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs,
leave it as empty: %v", err)
}
pkgName := "apache-beam"
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index fec5cf0ab50..67488bdc39f 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -21,11 +21,11 @@ import (
"context"
"errors"
"fmt"
- "log"
"os"
"os/exec"
"path/filepath"
"strings"
+ "time"
"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
@@ -47,7 +47,7 @@ func pipInstallRequirements(ctx context.Context, logger
*tools.Logger, files []s
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-q", "-r",
filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check",
"--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute(pythonVersion, args...); err !=
nil {
- fmt.Println("Some packages could not be
installed solely from the requirements cache. Installing packages from PyPI.")
+ bufLogger.Printf(ctx, "Some packages could not
be installed solely from the requirements cache. Installing packages from
PyPI.")
}
// The second install round opens up the search for
packages on PyPI and
// also installs dependencies. The key is that if all
the packages have
@@ -77,13 +77,15 @@ func isPackageInstalled(pkgName string) bool {
return true
}
+const pipLogFlushInterval time.Duration = 15 * time.Second
+
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(ctx context.Context, logger *tools.Logger, files
[]string, dir, name string, force, optional bool, extras []string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
- bufLogger := tools.NewBufferedLogger(logger)
+ bufLogger := tools.NewBufferedLoggerWithFlushInterval(ctx, logger,
pipLogFlushInterval)
for _, file := range files {
if file == name {
var packageSpec = name
@@ -146,6 +148,7 @@ func pipInstallPackage(ctx context.Context, logger
*tools.Logger, files []string
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(ctx context.Context, logger *tools.Logger, files
[]string, extraPackagesFile, dir string) error {
+ bufLogger := tools.NewBufferedLogger(logger)
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
@@ -163,7 +166,7 @@ func installExtraPackages(ctx context.Context, logger
*tools.Logger, files []str
for s.Scan() {
extraPackage := s.Text()
- log.Printf("Installing extra package: %s", extraPackage)
+ bufLogger.Printf(ctx, "Installing extra package: %s",
extraPackage)
if err = pipInstallPackage(ctx, logger, files, dir,
extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra
package %s: %v", extraPackage, err)
}
@@ -173,12 +176,13 @@ func installExtraPackages(ctx context.Context, logger
*tools.Logger, files []str
return nil
}
-func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
+func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string,
acceptableWhlSpecs []string) string {
+ bufLogger := tools.NewBufferedLogger(logger)
for _, file := range files {
if strings.HasPrefix(file, "apache_beam") {
for _, s := range acceptableWhlSpecs {
if strings.HasSuffix(file, s) {
- log.Printf("Found Apache Beam SDK
wheel: %v", file)
+ bufLogger.Printf(ctx, "Found Apache
Beam SDK wheel: %v", file)
return file
}
}
@@ -193,8 +197,8 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs
[]string) string {
// file, and we try to install it. If not successful, we fall back to
installing
// SDK from source tarball provided in sdkSrcFile.
func installSdk(ctx context.Context, logger *tools.Logger, files []string,
workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool)
error {
- sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
-
+ sdkWhlFile := findBeamSdkWhl(ctx, logger, files, acceptableWhlSpecs)
+ bufLogger := tools.NewBufferedLogger(logger)
if sdkWhlFile != "" {
// by default, pip rejects to install wheel if same version
already installed
isDev := strings.Contains(sdkWhlFile, ".dev")
@@ -202,7 +206,7 @@ func installSdk(ctx context.Context, logger *tools.Logger,
files []string, workD
if err == nil {
return nil
}
- log.Printf("Could not install Apache Beam SDK from a wheel: %v,
proceeding to install SDK from source tarball.", err)
+ bufLogger.Printf(ctx, "Could not install Apache Beam SDK from a
wheel: %v, proceeding to install SDK from source tarball.", err)
}
if !required {
_, err := os.Stat(filepath.Join(workDir, sdkSrcFile))