This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b9827b6c2d Refactor prism and go sdk logging and clean up messages 
(#36484)
2b9827b6c2d is described below

commit 2b9827b6c2da3806f1134cc4e08ac7130a611205
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Oct 13 20:39:16 2025 -0400

    Refactor prism and go sdk logging and clean up messages (#36484)
    
    * Move slog setup logic to sdk.
    
    * Add wrapper for slog so sdk can share the same logging framework with 
runner.
    
    * Beautify and clean up some logging messages.
    
    * Fix print tests.
---
 sdks/go/cmd/prism/prism.go                         | 50 ++----------------
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  2 +-
 sdks/go/pkg/beam/forward.go                        |  5 ++
 sdks/go/pkg/beam/log/log.go                        | 61 +++++++++++++++++++++-
 sdks/go/pkg/beam/log/structural.go                 | 39 ++++++++++++++
 .../beam/runners/prism/internal/worker/worker.go   |  8 ++-
 .../go/pkg/beam/runners/universal/runnerlib/job.go | 12 ++++-
 sdks/go/pkg/beam/runners/universal/universal.go    |  3 +-
 sdks/go/pkg/beam/x/debug/print_test.go             |  9 +++-
 9 files changed, 134 insertions(+), 55 deletions(-)

diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go
index 5e3f42a9e5a..7fe9580e473 100644
--- a/sdks/go/cmd/prism/prism.go
+++ b/sdks/go/cmd/prism/prism.go
@@ -22,14 +22,10 @@ import (
        "flag"
        "fmt"
        "log"
-       "log/slog"
-       "os"
-       "strings"
-       "time"
 
+       beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
-       "github.com/golang-cz/devslog"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 )
@@ -44,57 +40,17 @@ var (
 
 // Logging flags
 var (
-       logKind = flag.String("log_kind", "dev",
+       logKindFlag = flag.String("log_kind", "dev",
                "Determines the format of prism's logging to std err: valid 
values are `dev', 'json', or 'text'. Default is `dev`.")
        logLevelFlag = flag.String("log_level", "info",
                "Sets the minimum log level of Prism. Valid options are 
'debug', 'info','warn', and 'error'. Default is 'info'. Debug adds prism source 
lines.")
 )
 
-var logLevel = new(slog.LevelVar)
-
 func main() {
        flag.Parse()
        ctx, cancel := context.WithCancelCause(context.Background())
 
-       var logHandler slog.Handler
-       loggerOutput := os.Stderr
-       handlerOpts := &slog.HandlerOptions{
-               Level: logLevel,
-       }
-       switch strings.ToLower(*logLevelFlag) {
-       case "debug":
-               logLevel.Set(slog.LevelDebug)
-               handlerOpts.AddSource = true
-       case "info":
-               logLevel.Set(slog.LevelInfo)
-       case "warn":
-               logLevel.Set(slog.LevelWarn)
-       case "error":
-               logLevel.Set(slog.LevelError)
-       default:
-               log.Fatalf("Invalid value for log_level: %v, must be 'debug', 
'info', 'warn', or 'error'", *logKind)
-       }
-       switch strings.ToLower(*logKind) {
-       case "dev":
-               logHandler =
-                       devslog.NewHandler(loggerOutput, &devslog.Options{
-                               TimeFormat:         "[" + time.RFC3339Nano + 
"]",
-                               StringerFormatter:  true,
-                               HandlerOptions:     handlerOpts,
-                               StringIndentation:  false,
-                               NewLineAfterLog:    true,
-                               MaxErrorStackTrace: 3,
-                       })
-       case "json":
-               logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
-       case "text":
-               logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
-       default:
-               log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 
'json', or 'text'", *logKind)
-       }
-
-       slog.SetDefault(slog.New(logHandler))
-
+       beamlog.SetupLogging(*logLevelFlag, *logKindFlag)
        cli, err := makeJobClient(ctx,
                prism.Options{
                        Port:                *jobPort,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index d75ae37c610..969ac1b0a64 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -101,7 +101,7 @@ func MainWithOptions(ctx context.Context, loggingEndpoint, 
controlEndpoint strin
 
        elmTimeout, err := parseTimeoutDurationFlag(ctx, 
beam.PipelineOptions.Get("element_processing_timeout"))
        if err != nil {
-               log.Infof(ctx, "Failed to parse element_processing_timeout: %v, 
there will be no timeout for processing an element in a PTransform operation", 
err)
+               log.Debugf(ctx, "Failed to parse element_processing_timeout: 
%v, there will be no timeout for processing an element in a PTransform 
operation", err)
        }
 
        // Connect to FnAPI control server. Receive and execute work.
diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go
index b2f610b703e..7b33ae1168d 100644
--- a/sdks/go/pkg/beam/forward.go
+++ b/sdks/go/pkg/beam/forward.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/genx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // IMPLEMENTATION NOTE: functions and types in this file are assumed to be
@@ -51,6 +52,10 @@ func RegisterType(t reflect.Type) {
 }
 
 func init() {
+       runtime.RegisterInit(func() {
+               log.SetupLoggingWithDefault()
+       })
+
        runtime.RegisterInit(func() {
                if EnableSchemas {
                        schema.Initialize()
diff --git a/sdks/go/pkg/beam/log/log.go b/sdks/go/pkg/beam/log/log.go
index 4c1f5dddb01..784d1824e01 100644
--- a/sdks/go/pkg/beam/log/log.go
+++ b/sdks/go/pkg/beam/log/log.go
@@ -21,8 +21,14 @@ package log
 import (
        "context"
        "fmt"
+       "log"
+       "log/slog"
        "os"
+       "strings"
        "sync/atomic"
+       "time"
+
+       "github.com/golang-cz/devslog"
 )
 
 // Severity is the severity of the log message.
@@ -37,6 +43,11 @@ const (
        SevFatal
 )
 
+var (
+       LogLevel = "info" // The logging level for slog. Valid values are 
`debug`, `info`, `warn` or `error`. Default is `info`.
+       LogKind  = "text" // The logging format for slog. Valid values are 
`dev', 'json', or 'text'. Default is `text`.
+)
+
 // Logger is a context-aware logging backend. The richer context allows for
 // more sophisticated logging setups. Must be concurrency safe.
 type Logger interface {
@@ -54,7 +65,7 @@ type concreteLogger struct {
 }
 
 func init() {
-       logger.Store(&concreteLogger{&Standard{}})
+       logger.Store(&concreteLogger{&Structural{}})
 }
 
 // SetLogger sets the global Logger. Intended to be called during 
initialization
@@ -190,3 +201,51 @@ func Exitln(ctx context.Context, v ...any) {
        Output(ctx, SevFatal, 1, fmt.Sprintln(v...))
        os.Exit(1)
 }
+
+func SetupLoggingWithDefault() {
+       var logLevel = new(slog.LevelVar)
+       var logHandler slog.Handler
+       loggerOutput := os.Stderr
+       handlerOpts := &slog.HandlerOptions{
+               Level: logLevel,
+       }
+       switch strings.ToLower(LogLevel) {
+       case "debug":
+               logLevel.Set(slog.LevelDebug)
+               handlerOpts.AddSource = true
+       case "info":
+               logLevel.Set(slog.LevelInfo)
+       case "warn":
+               logLevel.Set(slog.LevelWarn)
+       case "error":
+               logLevel.Set(slog.LevelError)
+       default:
+               log.Fatalf("Invalid value for log_level: %v, must be 'debug', 
'info', 'warn', or 'error'", LogLevel)
+       }
+       switch strings.ToLower(LogKind) {
+       case "dev":
+               logHandler =
+                       devslog.NewHandler(loggerOutput, &devslog.Options{
+                               TimeFormat:         "[" + time.RFC3339Nano + 
"]",
+                               StringerFormatter:  true,
+                               HandlerOptions:     handlerOpts,
+                               StringIndentation:  false,
+                               NewLineAfterLog:    true,
+                               MaxErrorStackTrace: 3,
+                       })
+       case "json":
+               logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
+       case "text":
+               logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
+       default:
+               log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 
'json', or 'text'", LogKind)
+       }
+
+       slog.SetDefault(slog.New(logHandler))
+}
+
+func SetupLogging(logLevel, logKind string) {
+       LogLevel = logLevel
+       LogKind = logKind
+       SetupLoggingWithDefault()
+}
diff --git a/sdks/go/pkg/beam/log/structural.go 
b/sdks/go/pkg/beam/log/structural.go
new file mode 100644
index 00000000000..4ba9cd1af77
--- /dev/null
+++ b/sdks/go/pkg/beam/log/structural.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package log
+
+import (
+       "context"
+       slogger "log/slog"
+)
+
+// Structural is a wrapper over slog
+type Structural struct{}
+
+var loggerMap = map[Severity]func(string, ...any){
+       SevUnspecified: slogger.Info,
+       SevDebug:       slogger.Debug,
+       SevInfo:        slogger.Info,
+       SevWarn:        slogger.Warn,
+       SevError:       slogger.Error,
+       SevFatal:       slogger.Error,
+}
+
+// Log logs the message to the structural Go logger. For Panic, it does not
+// perform the os.Exit(1) call, but defers to the log wrapper.
+func (s *Structural) Log(ctx context.Context, sev Severity, _ int, msg string) 
{
+       loggerMap[sev](msg)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 5668449f6c9..33c8c3a7de5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
@@ -224,7 +225,6 @@ func (wk *W) Logging(stream 
fnpb.BeamFnLogging_LoggingServer) error {
                                slog.String("transformID", l.GetTransformId()), 
// TODO: pull the unique name from the pipeline graph.
                                slog.String("location", l.GetLogLocation()),
                                slog.Time(slog.TimeKey, 
l.GetTimestamp().AsTime()),
-                               slog.String(slog.MessageKey, l.GetMessage()),
                        }
                        if fs := l.GetCustomData().GetFields(); len(fs) > 0 {
                                var grp []any
@@ -245,7 +245,11 @@ func (wk *W) Logging(stream 
fnpb.BeamFnLogging_LoggingServer) error {
                                attrs = append(attrs, slog.Group("customData", 
grp...))
                        }
 
-                       slog.LogAttrs(stream.Context(), 
toSlogSev(l.GetSeverity()), "log from SDK worker", slog.Any("worker", wk), 
slog.Group("sdk", attrs...))
+                       if beamlog.LogLevel == "debug" {
+                               slog.LogAttrs(stream.Context(), 
toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage(), slog.Group("sdk", 
attrs...), slog.Any("worker", wk))
+                       } else {
+                               slog.LogAttrs(stream.Context(), 
toSlogSev(l.GetSeverity()), "[SDK] "+l.GetMessage())
+                       }
                }
        }
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 7d6a3027e47..81ff5a5eb94 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -19,6 +19,7 @@ import (
        "context"
        "fmt"
        "io"
+       "strings"
 
        "github.com/apache/beam/sdks/v2/go/container/tools"
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -138,7 +139,16 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                case msg.GetMessageResponse() != nil:
                        resp := msg.GetMessageResponse()
 
-                       text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), 
resp.GetMessageId(), resp.GetMessageText())
+                       var b strings.Builder
+                       if resp.GetTime() != "" {
+                               fmt.Fprintf(&b, "(time=%v)", resp.GetTime())
+                       }
+                       if resp.GetMessageId() != "" {
+                               fmt.Fprintf(&b, "(id=%v)", resp.GetMessageId())
+                       }
+                       b.WriteString(resp.GetMessageText())
+                       text := b.String()
+
                        log.Output(ctx, messageSeverity(resp.GetImportance()), 
1, text)
 
                        if resp.GetImportance() >= 
jobpb.JobMessage_JOB_MESSAGE_ERROR {
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index c63175c5857..25325b8fe9c 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
+       "google.golang.org/protobuf/encoding/prototext"
 
        // Importing to get the side effect of the remote execution hook. See 
init().
        _ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/init"
@@ -92,7 +93,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                return nil, errors.WithContextf(err, "generating model 
pipeline")
        }
 
-       log.Info(ctx, pipeline.String())
+       log.Debugf(ctx, "Pipeline proto: %s", prototext.Format(pipeline))
 
        opt := &runnerlib.JobOptions{
                Name:         jobopts.GetJobName(),
diff --git a/sdks/go/pkg/beam/x/debug/print_test.go 
b/sdks/go/pkg/beam/x/debug/print_test.go
index 0bbdee0b6fb..e064cabb1f7 100644
--- a/sdks/go/pkg/beam/x/debug/print_test.go
+++ b/sdks/go/pkg/beam/x/debug/print_test.go
@@ -18,6 +18,7 @@ package debug
 import (
        "bytes"
        "log"
+       "log/slog"
        "os"
        "strings"
        "testing"
@@ -92,10 +93,14 @@ func captureRunLogging(p *beam.Pipeline) string {
        // Pipe output to out
        var out bytes.Buffer
        log.SetOutput(&out)
+       defer log.SetOutput(os.Stderr)
+
+       oldLogger := slog.Default()
+       logHandler := slog.NewTextHandler(&out, nil)
+       slog.SetDefault(slog.New(logHandler))
+       defer slog.SetDefault((oldLogger))
 
        ptest.Run(p)
 
-       // Return to original state
-       log.SetOutput(os.Stderr)
        return out.String()
 }

Reply via email to