This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9827b6c2d Refactor prism and go sdk logging and clean up messages
(#36484)
2b9827b6c2d is described below
commit 2b9827b6c2da3806f1134cc4e08ac7130a611205
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Oct 13 20:39:16 2025 -0400
Refactor prism and go sdk logging and clean up messages (#36484)
* Move slog setup logic to sdk.
* Add wrapper for slog so sdk can share the same logging framework with
runner.
* Beautify and clean up some logging messages.
* Fix print tests.
---
sdks/go/cmd/prism/prism.go | 50 ++----------------
sdks/go/pkg/beam/core/runtime/harness/harness.go | 2 +-
sdks/go/pkg/beam/forward.go | 5 ++
sdks/go/pkg/beam/log/log.go | 61 +++++++++++++++++++++-
sdks/go/pkg/beam/log/structural.go | 39 ++++++++++++++
.../beam/runners/prism/internal/worker/worker.go | 8 ++-
.../go/pkg/beam/runners/universal/runnerlib/job.go | 12 ++++-
sdks/go/pkg/beam/runners/universal/universal.go | 3 +-
sdks/go/pkg/beam/x/debug/print_test.go | 9 +++-
9 files changed, 134 insertions(+), 55 deletions(-)
diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go
index 5e3f42a9e5a..7fe9580e473 100644
--- a/sdks/go/cmd/prism/prism.go
+++ b/sdks/go/cmd/prism/prism.go
@@ -22,14 +22,10 @@ import (
"flag"
"fmt"
"log"
- "log/slog"
- "os"
- "strings"
- "time"
+ beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
- "github.com/golang-cz/devslog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@@ -44,57 +40,17 @@ var (
// Logging flags
var (
- logKind = flag.String("log_kind", "dev",
+ logKindFlag = flag.String("log_kind", "dev",
"Determines the format of prism's logging to std err: valid
values are `dev', 'json', or 'text'. Default is `dev`.")
logLevelFlag = flag.String("log_level", "info",
"Sets the minimum log level of Prism. Valid options are
'debug', 'info','warn', and 'error'. Default is 'info'. Debug adds prism source
lines.")
)
-var logLevel = new(slog.LevelVar)
-
func main() {
flag.Parse()
ctx, cancel := context.WithCancelCause(context.Background())
- var logHandler slog.Handler
- loggerOutput := os.Stderr
- handlerOpts := &slog.HandlerOptions{
- Level: logLevel,
- }
- switch strings.ToLower(*logLevelFlag) {
- case "debug":
- logLevel.Set(slog.LevelDebug)
- handlerOpts.AddSource = true
- case "info":
- logLevel.Set(slog.LevelInfo)
- case "warn":
- logLevel.Set(slog.LevelWarn)
- case "error":
- logLevel.Set(slog.LevelError)
- default:
- log.Fatalf("Invalid value for log_level: %v, must be 'debug',
'info', 'warn', or 'error'", *logKind)
- }
- switch strings.ToLower(*logKind) {
- case "dev":
- logHandler =
- devslog.NewHandler(loggerOutput, &devslog.Options{
- TimeFormat: "[" + time.RFC3339Nano +
"]",
- StringerFormatter: true,
- HandlerOptions: handlerOpts,
- StringIndentation: false,
- NewLineAfterLog: true,
- MaxErrorStackTrace: 3,
- })
- case "json":
- logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
- case "text":
- logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
- default:
- log.Fatalf("Invalid value for log_kind: %v, must be 'dev',
'json', or 'text'", *logKind)
- }
-
- slog.SetDefault(slog.New(logHandler))
-
+ beamlog.SetupLogging(*logLevelFlag, *logKindFlag)
cli, err := makeJobClient(ctx,
prism.Options{
Port: *jobPort,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index d75ae37c610..969ac1b0a64 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -101,7 +101,7 @@ func MainWithOptions(ctx context.Context, loggingEndpoint,
controlEndpoint strin
elmTimeout, err := parseTimeoutDurationFlag(ctx,
beam.PipelineOptions.Get("element_processing_timeout"))
if err != nil {
- log.Infof(ctx, "Failed to parse element_processing_timeout: %v,
there will be no timeout for processing an element in a PTransform operation",
err)
+ log.Debugf(ctx, "Failed to parse element_processing_timeout:
%v, there will be no timeout for processing an element in a PTransform
operation", err)
}
// Connect to FnAPI control server. Receive and execute work.
diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go
index b2f610b703e..7b33ae1168d 100644
--- a/sdks/go/pkg/beam/forward.go
+++ b/sdks/go/pkg/beam/forward.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)
// IMPLEMENTATION NOTE: functions and types in this file are assumed to be
@@ -51,6 +52,10 @@ func RegisterType(t reflect.Type) {
}
func init() {
+ runtime.RegisterInit(func() {
+ log.SetupLoggingWithDefault()
+ })
+
runtime.RegisterInit(func() {
if EnableSchemas {
schema.Initialize()
diff --git a/sdks/go/pkg/beam/log/log.go b/sdks/go/pkg/beam/log/log.go
index 4c1f5dddb01..784d1824e01 100644
--- a/sdks/go/pkg/beam/log/log.go
+++ b/sdks/go/pkg/beam/log/log.go
@@ -21,8 +21,14 @@ package log
import (
"context"
"fmt"
+ "log"
+ "log/slog"
"os"
+ "strings"
"sync/atomic"
+ "time"
+
+ "github.com/golang-cz/devslog"
)
// Severity is the severity of the log message.
@@ -37,6 +43,11 @@ const (
SevFatal
)
+var (
+ LogLevel = "info" // The logging level for slog. Valid values are
`debug`, `info`, `warn` or `error`. Default is `info`.
+ LogKind = "text" // The logging format for slog. Valid values are
`dev', 'json', or 'text'. Default is `text`.
+)
+
// Logger is a context-aware logging backend. The richer context allows for
// more sophisticated logging setups. Must be concurrency safe.
type Logger interface {
@@ -54,7 +65,7 @@ type concreteLogger struct {
}
func init() {
- logger.Store(&concreteLogger{&Standard{}})
+ logger.Store(&concreteLogger{&Structural{}})
}
// SetLogger sets the global Logger. Intended to be called during
initialization
@@ -190,3 +201,51 @@ func Exitln(ctx context.Context, v ...any) {
Output(ctx, SevFatal, 1, fmt.Sprintln(v...))
os.Exit(1)
}
+
+func SetupLoggingWithDefault() {
+ var logLevel = new(slog.LevelVar)
+ var logHandler slog.Handler
+ loggerOutput := os.Stderr
+ handlerOpts := &slog.HandlerOptions{
+ Level: logLevel,
+ }
+ switch strings.ToLower(LogLevel) {
+ case "debug":
+ logLevel.Set(slog.LevelDebug)
+ handlerOpts.AddSource = true
+ case "info":
+ logLevel.Set(slog.LevelInfo)
+ case "warn":
+ logLevel.Set(slog.LevelWarn)
+ case "error":
+ logLevel.Set(slog.LevelError)
+ default:
+ log.Fatalf("Invalid value for log_level: %v, must be 'debug',
'info', 'warn', or 'error'", LogLevel)
+ }
+ switch strings.ToLower(LogKind) {
+ case "dev":
+ logHandler =
+ devslog.NewHandler(loggerOutput, &devslog.Options{
+ TimeFormat: "[" + time.RFC3339Nano +
"]",
+ StringerFormatter: true,
+ HandlerOptions: handlerOpts,
+ StringIndentation: false,
+ NewLineAfterLog: true,
+ MaxErrorStackTrace: 3,
+ })
+ case "json":
+ logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
+ case "text":
+ logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
+ default:
+ log.Fatalf("Invalid value for log_kind: %v, must be 'dev',
'json', or 'text'", LogKind)
+ }
+
+ slog.SetDefault(slog.New(logHandler))
+}
+
+func SetupLogging(logLevel, logKind string) {
+ LogLevel = logLevel
+ LogKind = logKind
+ SetupLoggingWithDefault()
+}
diff --git a/sdks/go/pkg/beam/log/structural.go
b/sdks/go/pkg/beam/log/structural.go
new file mode 100644
index 00000000000..4ba9cd1af77
--- /dev/null
+++ b/sdks/go/pkg/beam/log/structural.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package log
+
+import (
+ "context"
+ slogger "log/slog"
+)
+
+// Structural is a wrapper over slog
+type Structural struct{}
+
+var loggerMap = map[Severity]func(string, ...any){
+ SevUnspecified: slogger.Info,
+ SevDebug: slogger.Debug,
+ SevInfo: slogger.Info,
+ SevWarn: slogger.Warn,
+ SevError: slogger.Error,
+ SevFatal: slogger.Error,
+}
+
+// Log logs the message to the structural Go logger. For Panic, it does not
+// perform the os.Exit(1) call, but defers to the log wrapper.
+func (s *Structural) Log(ctx context.Context, sev Severity, _ int, msg string)
{
+ loggerMap[sev](msg)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 5668449f6c9..33c8c3a7de5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
@@ -224,7 +225,6 @@ func (wk *W) Logging(stream
fnpb.BeamFnLogging_LoggingServer) error {
slog.String("transformID", l.GetTransformId()),
// TODO: pull the unique name from the pipeline graph.
slog.String("location", l.GetLogLocation()),
slog.Time(slog.TimeKey,
l.GetTimestamp().AsTime()),
- slog.String(slog.MessageKey, l.GetMessage()),
}
if fs := l.GetCustomData().GetFields(); len(fs) > 0 {
var grp []any
@@ -245,7 +245,11 @@ func (wk *W) Logging(stream
fnpb.BeamFnLogging_LoggingServer) error {
attrs = append(attrs, slog.Group("customData",
grp...))
}
- slog.LogAttrs(stream.Context(),
toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk),
slog.Group("sdk", attrs...))
+ if beamlog.LogLevel == "debug" {
+ slog.LogAttrs(stream.Context(),
toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage(), slog.Group("sdk",
attrs...), slog.Any("worker", wk))
+ } else {
+ slog.LogAttrs(stream.Context(),
toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage())
+ }
}
}
}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 7d6a3027e47..81ff5a5eb94 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -19,6 +19,7 @@ import (
"context"
"fmt"
"io"
+ "strings"
"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -138,7 +139,16 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
case msg.GetMessageResponse() != nil:
resp := msg.GetMessageResponse()
- text := fmt.Sprintf("%v (%v): %v", resp.GetTime(),
resp.GetMessageId(), resp.GetMessageText())
+ var b strings.Builder
+ if resp.GetTime() != "" {
+ fmt.Fprintf(&b, "(time=%v)", resp.GetTime())
+ }
+ if resp.GetMessageId() != "" {
+ fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId())
+ }
+ b.WriteString(resp.GetMessageText())
+ text := b.String()
+
log.Output(ctx, messageSeverity(resp.GetImportance()),
1, text)
if resp.GetImportance() >=
jobpb.JobMessage_JOB_MESSAGE_ERROR {
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go
b/sdks/go/pkg/beam/runners/universal/universal.go
index c63175c5857..25325b8fe9c 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
+ "google.golang.org/protobuf/encoding/prototext"
// Importing to get the side effect of the remote execution hook. See
init().
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/init"
@@ -92,7 +93,7 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
return nil, errors.WithContextf(err, "generating model
pipeline")
}
- log.Info(ctx, pipeline.String())
+ log.Debugf(ctx, "Pipeline proto: %s", prototext.Format(pipeline))
opt := &runnerlib.JobOptions{
Name: jobopts.GetJobName(),
diff --git a/sdks/go/pkg/beam/x/debug/print_test.go
b/sdks/go/pkg/beam/x/debug/print_test.go
index 0bbdee0b6fb..e064cabb1f7 100644
--- a/sdks/go/pkg/beam/x/debug/print_test.go
+++ b/sdks/go/pkg/beam/x/debug/print_test.go
@@ -18,6 +18,7 @@ package debug
import (
"bytes"
"log"
+ "log/slog"
"os"
"strings"
"testing"
@@ -92,10 +93,14 @@ func captureRunLogging(p *beam.Pipeline) string {
// Pipe output to out
var out bytes.Buffer
log.SetOutput(&out)
+ defer log.SetOutput(os.Stderr)
+
+ oldLogger := slog.Default()
+ logHandler := slog.NewTextHandler(&out, nil)
+ slog.SetDefault(slog.New(logHandler))
+ defer slog.SetDefault((oldLogger))
ptest.Run(p)
- // Return to original state
- log.SetOutput(os.Stderr)
return out.String()
}