This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch bootloaderLogging
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a3d86420330ba916ef1b342d1993aa6055df7435
Author: lostluck <[email protected]>
AuthorDate: Wed Mar 29 14:44:28 2023 -0700

    (#25314) Have boot loaders log to logging service where possible.
---
 sdks/go/container/boot.go                 |  33 +++++----
 sdks/go/container/tools/logging.go        | 117 ++++++++++++++++++++++++++++++
 sdks/go/container/tools/logging_test.go   | 115 +++++++++++++++++++++++++++++
 sdks/go/container/tools/provision.go      |  89 +++++++++++++++++++++++
 sdks/go/container/tools/provision_test.go | 110 ++++++++++++++++++++++++++++
 sdks/java/container/boot.go               |  47 ++++++------
 sdks/python/container/boot.go             |  62 +++++++---------
 7 files changed, 500 insertions(+), 73 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index db851048271..1a5e154ace7 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -28,6 +28,7 @@ import (
        "strings"
        "time"
 
+       "github.com/apache/beam/sdks/v2/go/container/tools"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 
@@ -35,7 +36,6 @@ import (
        _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -58,7 +58,7 @@ const (
        enableGoogleCloudProfilerOption = "enable_google_cloud_profiler"
 )
 
-func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error {
+func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger 
*tools.Logger, metadata map[string]string) error {
        if metadata == nil {
                return errors.New("enable_google_cloud_profiler is set to true, 
but no metadata is received from provision server, profiling will not be 
enabled")
        }
@@ -72,7 +72,7 @@ func configureGoogleCloudProfilerEnvVars(metadata 
map[string]string) error {
        }
        os.Setenv(cloudProfilingJobName, jobName)
        os.Setenv(cloudProfilingJobID, jobID)
-       log.Printf("Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID)
+       logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, 
jobID)
        return nil
 }
 
@@ -87,7 +87,7 @@ func main() {
 
        ctx := grpcx.WriteWorkerID(context.Background(), *id)
 
-       info, err := provision.Info(ctx, *provisionEndpoint)
+       info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
        if err != nil {
                log.Fatalf("Failed to obtain provisioning information: %v", err)
        }
@@ -97,13 +97,14 @@ func main() {
        if err != nil {
                log.Fatalf("Endpoint not set: %v", err)
        }
-       log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
+       logger := &tools.Logger{Endpoint: *loggingEndpoint}
+       logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, 
" "))
 
        // (1) Obtain the pipeline options
 
-       options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+       options, err := tools.ProtoToJSON(info.GetPipelineOptions())
        if err != nil {
-               log.Fatalf("Failed to convert pipeline options: %v", err)
+               logger.Fatalf(ctx, "Failed to convert pipeline options: %v", 
err)
        }
 
        // (2) Retrieve the staged files.
@@ -115,19 +116,19 @@ func main() {
        dir := filepath.Join(*semiPersistDir, "staged")
        artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetDependencies(), info.GetRetrievalToken(), dir)
        if err != nil {
-               log.Fatalf("Failed to retrieve staged files: %v", err)
+               logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
        }
 
-       name, err := getGoWorkerArtifactName(artifacts)
+       name, err := getGoWorkerArtifactName(ctx, logger, artifacts)
        if err != nil {
-               log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+               logger.Fatalf(ctx, "Failed to get Go Worker Artifact Name: %v", 
err)
        }
 
        // (3) The persist dir may be on a noexec volume, so we must
        // copy the binary to a different location to execute.
        const prog = "/bin/worker"
        if err := copyExe(filepath.Join(dir, name), prog); err != nil {
-               log.Fatalf("Failed to copy worker binary: %v", err)
+               logger.Fatalf(ctx, "Failed to copy worker binary: %v", err)
        }
 
        args := []string{
@@ -148,9 +149,9 @@ func main() {
 
        enableGoogleCloudProfiler := strings.Contains(options, 
enableGoogleCloudProfilerOption)
        if enableGoogleCloudProfiler {
-               err := configureGoogleCloudProfilerEnvVars(info.Metadata)
+               err := configureGoogleCloudProfilerEnvVars(ctx, logger, 
info.Metadata)
                if err != nil {
-                       log.Printf("could not configure Google Cloud Profiler 
variables, got %v", err)
+                       logger.Printf(ctx, "could not configure Google Cloud 
Profiler variables, got %v", err)
                }
        }
 
@@ -166,10 +167,10 @@ func main() {
                }
        }
 
-       log.Fatalf("User program exited: %v", err)
+       logger.Fatalf(ctx, "User program exited: %v", err)
 }
 
-func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, 
error) {
+func getGoWorkerArtifactName(ctx context.Context, logger *tools.Logger, 
artifacts []*pipepb.ArtifactInformation) (string, error) {
        const worker = "worker"
        name := worker
 
@@ -190,7 +191,7 @@ func getGoWorkerArtifactName(artifacts 
[]*pipepb.ArtifactInformation) (string, e
                for _, a := range artifacts {
                        n, _ := artifact.MustExtractFilePayload(a)
                        if n == worker {
-                               log.Printf("Go worker binary found with legacy 
name '%v'", worker)
+                               logger.Printf(ctx, "Go worker binary found with 
legacy name '%v'", worker)
                                return n, nil
                        }
                }
diff --git a/sdks/go/container/tools/logging.go 
b/sdks/go/container/tools/logging.go
new file mode 100644
index 00000000000..fd8ed50e020
--- /dev/null
+++ b/sdks/go/container/tools/logging.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "log"
+       "os"
+       "sync"
+       "time"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+// Logger is a wrapper around the FnAPI Logging Client, intended for
+// container boot loader use. Not intended for Beam end users.
+type Logger struct {
+       Endpoint string
+
+       client  logSender
+       closeFn func()
+       mu      sync.Mutex // To protect Send in the rare case multiple 
goroutines are calling this logger.
+}
+
+type logSender interface {
+       Send(*fnpb.LogEntry_List) error
+       CloseSend() error
+}
+
+func (l *Logger) Close() {
+       if l.closeFn != nil {
+               l.client.CloseSend()
+               l.closeFn()
+               l.closeFn = nil
+               l.client = nil
+       }
+}
+
+// Log a message with the given severity.
+func (l *Logger) Log(ctx context.Context, sev fnpb.LogEntry_Severity_Enum, 
message string) {
+       l.mu.Lock()
+       defer l.mu.Unlock()
+
+       var exitErr error
+       defer func() {
+               if exitErr != nil {
+                       log.Println("boot.go: error logging message over FnAPI. 
endpoint", l.Endpoint, "error:", exitErr, "message follows")
+                       log.Println(sev.String(), message)
+               }
+       }()
+       if l.client == nil {
+               if l.Endpoint == "" {
+                       exitErr = errors.New("no logging endpoint set")
+                       return
+               }
+               cc, err := grpcx.Dial(ctx, l.Endpoint, 2*time.Minute)
+               if err != nil {
+                       exitErr = err
+                       return
+               }
+               l.closeFn = func() { cc.Close() }
+
+               l.client, err = fnpb.NewBeamFnLoggingClient(cc).Logging(ctx)
+               if err != nil {
+                       exitErr = err
+                       l.Close()
+                       return
+               }
+       }
+
+       err := l.client.Send(&fnpb.LogEntry_List{
+               LogEntries: []*fnpb.LogEntry{
+                       {
+                               Severity:  sev,
+                               Timestamp: timestamppb.Now(),
+                               Message:   message,
+                       },
+               },
+       })
+       if err != nil {
+               exitErr = err
+               return
+       }
+}
+
+// Printf logs the message with Debug severity.
+func (l *Logger) Printf(ctx context.Context, format string, args ...any) {
+       l.Log(ctx, fnpb.LogEntry_Severity_DEBUG, fmt.Sprintf(format, args...))
+}
+
+// Warnf logs the message with Warning severity.
+func (l *Logger) Warnf(ctx context.Context, format string, args ...any) {
+       l.Log(ctx, fnpb.LogEntry_Severity_WARN, fmt.Sprintf(format, args...))
+}
+
+// Fatalf logs the message with Critical severity, and then calls os.Exit(1).
+func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) {
+       l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, 
args...))
+       os.Exit(1)
+}
diff --git a/sdks/go/container/tools/logging_test.go 
b/sdks/go/container/tools/logging_test.go
new file mode 100644
index 00000000000..e33b3c075a6
--- /dev/null
+++ b/sdks/go/container/tools/logging_test.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "log"
+       "strings"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+)
+
+type logCatcher struct {
+       msgs []*fnpb.LogEntry_List
+       err  error
+}
+
+func (l *logCatcher) Send(msg *fnpb.LogEntry_List) error {
+       l.msgs = append(l.msgs, msg)
+       return l.err
+}
+
+func (l *logCatcher) CloseSend() error {
+       return nil
+}
+
+func TestLogger(t *testing.T) {
+       ctx := context.Background()
+       t.Run("SuccessfulLogging", func(t *testing.T) {
+               catcher := &logCatcher{}
+               l := &Logger{client: catcher}
+
+               l.Printf(ctx, "foo %v", "bar")
+
+               received := catcher.msgs[0].GetLogEntries()[0]
+
+               if got, want := received.Message, "foo bar"; got != want {
+                       t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message 
%q, want %q", got, want)
+               }
+
+               if got, want := received.Severity, 
fnpb.LogEntry_Severity_DEBUG; got != want {
+                       t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity 
%v, want %v", got, want)
+               }
+       })
+       t.Run("backup path", func(t *testing.T) {
+               catcher := &logCatcher{}
+               l := &Logger{client: catcher}
+
+               // Validate error outputs.
+               var buf bytes.Buffer
+               ll := log.Default()
+               ll.SetOutput(&buf)
+
+               catcher.err = errors.New("test error")
+               wantMsg := "checking for error?"
+               l.Printf(ctx, wantMsg)
+
+               line, err := buf.ReadString('\n')
+               if err != nil {
+                       t.Errorf("unexpected error reading form backup log 
buffer: %v", err)
+               }
+
+               if got, want := line, "boot.go: error logging message over 
FnAPI"; !strings.Contains(got, want) {
+                       t.Errorf("backup log buffer didn't contain expected 
log, got %q, want it to contain %q", got, want)
+               }
+               if got, want := line, "test error"; !strings.Contains(got, 
want) {
+                       t.Errorf("backup log buffer didn't contain expected 
log, got %q, want it to contain %q", got, want)
+               }
+
+               line, err = buf.ReadString('\n')
+               if err != nil {
+                       t.Errorf("unexpected error reading form backup log 
buffer: %v", err)
+               }
+
+               if got, want := line, wantMsg; !strings.Contains(got, want) {
+                       t.Errorf("backup log buffer didn't contain the message, 
got %q, want it to contain %q", got, want)
+               }
+       })
+
+       t.Run("no endpoint", func(t *testing.T) {
+               l := &Logger{}
+
+               var buf bytes.Buffer
+               ll := log.Default()
+               ll.SetOutput(&buf)
+
+               l.Printf(ctx, "trying to log")
+
+               line, err := buf.ReadString('\n')
+               if err != nil {
+                       t.Errorf("unexpected error reading form backup log 
buffer: %v", err)
+               }
+               if got, want := line, "no logging endpoint set"; 
!strings.Contains(got, want) {
+                       t.Errorf("backup log buffer didn't contain expected 
error, got %q, want it to contain %q", got, want)
+               }
+
+       })
+
+}
diff --git a/sdks/go/container/tools/provision.go 
b/sdks/go/container/tools/provision.go
new file mode 100644
index 00000000000..dab3383fc17
--- /dev/null
+++ b/sdks/go/container/tools/provision.go
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package tools contains utilities for Beam bootloader containers, such as
+// for obtaining runtime provision information -- such as pipeline options.
+// or for logging to the log service.
+//
+// For Beam Internal use.
+package tools
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "time"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "github.com/golang/protobuf/jsonpb"
+       google_pb "github.com/golang/protobuf/ptypes/struct"
+)
+
+// ProvisionInfo returns the runtime provisioning info for the worker.
+func ProvisionInfo(ctx context.Context, endpoint string) (*fnpb.ProvisionInfo, 
error) {
+       cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+       if err != nil {
+               return nil, err
+       }
+       defer cc.Close()
+
+       client := fnpb.NewProvisionServiceClient(cc)
+
+       resp, err := client.GetProvisionInfo(ctx, 
&fnpb.GetProvisionInfoRequest{})
+       if err != nil {
+               return nil, fmt.Errorf("failed to get manifest: %w", err)
+       }
+       if resp.GetInfo() == nil {
+               return nil, errors.New("empty manifest")
+       }
+       return resp.GetInfo(), nil
+}
+
+// OptionsToProto converts pipeline options to a proto struct via JSON.
+func OptionsToProto(v any) (*google_pb.Struct, error) {
+       data, err := json.Marshal(v)
+       if err != nil {
+               return nil, err
+       }
+       return JSONToProto(string(data))
+}
+
+// JSONToProto converts JSON-encoded pipeline options to a proto struct.
+func JSONToProto(data string) (*google_pb.Struct, error) {
+       var out google_pb.Struct
+       if err := jsonpb.UnmarshalString(string(data), &out); err != nil {
+               return nil, err
+       }
+       return &out, nil
+}
+
+// ProtoToOptions converts pipeline options from a proto struct via JSON.
+func ProtoToOptions(opt *google_pb.Struct, v any) error {
+       data, err := ProtoToJSON(opt)
+       if err != nil {
+               return err
+       }
+       return json.Unmarshal([]byte(data), v)
+}
+
+// ProtoToJSON converts pipeline options from a proto struct to JSON.
+func ProtoToJSON(opt *google_pb.Struct) (string, error) {
+       if opt == nil {
+               return "{}", nil
+       }
+       return (&jsonpb.Marshaler{}).MarshalToString(opt)
+}
diff --git a/sdks/go/container/tools/provision_test.go 
b/sdks/go/container/tools/provision_test.go
new file mode 100644
index 00000000000..dbb78040632
--- /dev/null
+++ b/sdks/go/container/tools/provision_test.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "reflect"
+       "sync"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+type s struct {
+       A int    `json:"a,omitempty"`
+       B string `json:"b,omitempty"`
+       C bool   `json:"c,omitempty"`
+       D *s     `json:"d,omitempty"`
+}
+
+// TestConversions verifies that we can process proto structs via JSON.
+func TestConversions(t *testing.T) {
+       tests := []s{
+               s{},
+               s{A: 2},
+               s{B: "foo"},
+               s{C: true},
+               s{D: &s{A: 3}},
+               s{A: 1, B: "bar", C: true, D: &s{A: 3, B: "baz"}},
+       }
+
+       for _, test := range tests {
+               enc, err := OptionsToProto(test)
+               if err != nil {
+                       t.Errorf("Failed to marshal %v: %v", test, err)
+               }
+               var ret s
+               if err := ProtoToOptions(enc, &ret); err != nil {
+                       t.Errorf("Failed to unmarshal %v from %v: %v", test, 
enc, err)
+               }
+               if !reflect.DeepEqual(test, ret) {
+                       t.Errorf("Unmarshal(Marshal(%v)) = %v, want %v", test, 
ret, test)
+               }
+       }
+}
+
+type ProvisionServiceServicer struct {
+       fnpb.UnimplementedProvisionServiceServer
+}
+
+func (p ProvisionServiceServicer) GetProvisionInfo(ctx context.Context, req 
*fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) {
+       return &fnpb.GetProvisionInfoResponse{Info: 
&fnpb.ProvisionInfo{RetrievalToken: "token"}}, nil
+}
+
+func setup(addr *string, wg *sync.WaitGroup) {
+       l, err := net.Listen("tcp", ":0")
+       if err != nil {
+               log.Fatalf("failed to find an open port: %v", err)
+       }
+       defer l.Close()
+       port := l.Addr().(*net.TCPAddr).Port
+       *addr = fmt.Sprintf(":%d", port)
+
+       server := grpc.NewServer()
+       defer server.Stop()
+
+       prs := &ProvisionServiceServicer{}
+       fnpb.RegisterProvisionServiceServer(server, prs)
+
+       wg.Done()
+
+       if err := server.Serve(l); err != nil {
+               log.Fatalf("cannot serve the server: %v", err)
+       }
+}
+
+func TestProvisionInfo(t *testing.T) {
+
+       endpoint := ""
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go setup(&endpoint, &wg)
+       wg.Wait()
+
+       got, err := ProvisionInfo(context.Background(), endpoint)
+       if err != nil {
+               t.Errorf("error in response: %v", err)
+       }
+       want := &fnpb.ProvisionInfo{RetrievalToken: "token"}
+       if got.GetRetrievalToken() != want.GetRetrievalToken() {
+               t.Errorf("provision.Info() = %v, want %v", got, want)
+       }
+}
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index a5a42d4fb1e..ea8d3cd8e38 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -30,10 +30,10 @@ import (
        "strconv"
        "strings"
 
+       "github.com/apache/beam/sdks/v2/go/container/tools"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx"
@@ -71,7 +71,7 @@ func main() {
 
        ctx := grpcx.WriteWorkerID(context.Background(), *id)
 
-       info, err := provision.Info(ctx, *provisionEndpoint)
+       info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
        if err != nil {
                log.Fatalf("Failed to obtain provisioning information: %v", err)
        }
@@ -97,13 +97,14 @@ func main() {
        if *controlEndpoint == "" {
                log.Fatal("No control endpoint provided.")
        }
+       logger := &tools.Logger{Endpoint: *loggingEndpoint}
 
-       log.Printf("Initializing java harness: %v", strings.Join(os.Args, " "))
+       logger.Printf(ctx, "Initializing java harness: %v", 
strings.Join(os.Args, " "))
 
        // (1) Obtain the pipeline options
-       options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+       options, err := tools.ProtoToJSON(info.GetPipelineOptions())
        if err != nil {
-               log.Fatalf("Failed to convert pipeline options: %v", err)
+               logger.Fatalf(ctx, "Failed to convert pipeline options: %v", 
err)
        }
 
        // (2) Retrieve the staged user jars. We ignore any disk limit,
@@ -118,7 +119,7 @@ func main() {
 
        artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetDependencies(), info.GetRetrievalToken(), dir)
        if err != nil {
-               log.Fatalf("Failed to retrieve staged files: %v", err)
+               logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err)
        }
 
        // (3) Invoke the Java harness, preserving artifact ordering in 
classpath.
@@ -178,28 +179,32 @@ func main() {
                                        } else {
                                                args = append(args, 
fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
                                        }
-                                       log.Printf("Turning on Cloud Profiling. 
Profile heap: %t", enableGoogleCloudHeapSampling)
+                                       logger.Printf(ctx, "Turning on Cloud 
Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
                                } else {
-                                       log.Println("Required job_id missing 
from metadata, profiling will not be enabled without it.")
+                                       logger.Printf(ctx, "Required job_id 
missing from metadata, profiling will not be enabled without it.")
                                }
                        } else {
-                               log.Println("Required job_name missing from 
metadata, profiling will not be enabled without it.")
+                               logger.Printf(ctx, "Required job_name missing 
from metadata, profiling will not be enabled without it.")
                        }
                } else {
-                       log.Println("enable_google_cloud_profiler is set to 
true, but no metadata is received from provision server, profiling will not be 
enabled.")
+                       logger.Printf(ctx, "enable_google_cloud_profiler is set 
to true, but no metadata is received from provision server, profiling will not 
be enabled.")
                }
        }
 
        disableJammAgent := strings.Contains(options, disableJammAgentOption)
        if disableJammAgent {
-               log.Printf("Disabling Jamm agent. Measuring object size will be 
inaccurate.")
+               logger.Printf(ctx, "Disabling Jamm agent. Measuring object size 
will be inaccurate.")
        } else {
                args = append(args, jammAgentArgs)
        }
        // Apply meta options
        const metaDir = "/opt/apache/beam/options"
-       metaOptions, err := LoadMetaOptions(metaDir)
-       javaOptions := BuildOptions(metaOptions)
+
+       // Note: Error is unchecked, so parsing errors won't abort container.
+       // TODO: verify if it's intentional or not.
+       metaOptions, _ := LoadMetaOptions(ctx, logger, metaDir)
+
+       javaOptions := BuildOptions(ctx, logger, metaOptions)
        // (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .."
        args = append(args, javaOptions.JavaArguments...)
 
@@ -221,19 +226,19 @@ func main() {
        if pipelineOptions, ok := 
info.GetPipelineOptions().GetFields()["options"]; ok {
                if modules, ok := 
pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok {
                        for _, module := range 
modules.GetListValue().GetValues() {
-                               args = append(args, "--add-opens=" + 
module.GetStringValue())
+                               args = append(args, 
"--add-opens="+module.GetStringValue())
                        }
                }
        }
        // Automatically open modules for Java 11+
        openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"
        if _, err := os.Stat(openModuleAgentJar); err == nil {
-               args = append(args, "-javaagent:" + openModuleAgentJar)
+               args = append(args, "-javaagent:"+openModuleAgentJar)
        }
        args = append(args, "org.apache.beam.fn.harness.FnHarness")
-       log.Printf("Executing: java %v", strings.Join(args, " "))
+       logger.Printf(ctx, "Executing: java %v", strings.Join(args, " "))
 
-       log.Fatalf("Java exited: %v", execx.Execute("java", args...))
+       logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
 }
 
 // heapSizeLimit returns 80% of the runner limit, if provided. If not provided,
@@ -301,7 +306,7 @@ func (f byPriority) Less(i, j int) bool { return 
f[i].Priority > f[j].Priority }
 //
 // Loading meta-options from disk allows extra files and their
 // configuration be kept together and defined externally.
-func LoadMetaOptions(dir string) ([]*MetaOption, error) {
+func LoadMetaOptions(ctx context.Context, logger *tools.Logger, dir string) 
([]*MetaOption, error) {
        var meta []*MetaOption
 
        worker := func(path string, info os.FileInfo, err error) error {
@@ -328,7 +333,7 @@ func LoadMetaOptions(dir string) ([]*MetaOption, error) {
                        return fmt.Errorf("failed to parse %s: %v", path, err)
                }
 
-               log.Printf("Loaded meta-option '%s'", option.Name)
+               logger.Printf(ctx, "Loaded meta-option '%s'", option.Name)
 
                meta = append(meta, &option)
                return nil
@@ -340,7 +345,7 @@ func LoadMetaOptions(dir string) ([]*MetaOption, error) {
        return meta, nil
 }
 
-func BuildOptions(metaOptions []*MetaOption) *Options {
+func BuildOptions(ctx context.Context, logger *tools.Logger, metaOptions 
[]*MetaOption) *Options {
        options := &Options{Properties: make(map[string]string)}
 
        sort.Sort(byPriority(metaOptions))
@@ -357,7 +362,7 @@ func BuildOptions(metaOptions []*MetaOption) *Options {
                        if !exists {
                                options.Properties[key] = value
                        } else {
-                               log.Printf("Warning: %s property -D%s=%s was 
redefined", meta.Name, key, value)
+                               logger.Warnf(ctx, "Warning: %s property -D%s=%s 
was redefined", meta.Name, key, value)
                        }
                }
 
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index bb9acbacf62..1a96859669a 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -20,9 +20,9 @@ package main
 import (
        "context"
        "encoding/json"
+       "errors"
        "flag"
        "fmt"
-       "io/ioutil"
        "log"
        "os"
        "os/exec"
@@ -34,9 +34,9 @@ import (
        "syscall"
        "time"
 
+       "github.com/apache/beam/sdks/v2/go/container/tools"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
        "github.com/golang/protobuf/jsonpb"
@@ -81,7 +81,7 @@ func main() {
                os.Exit(0)
        }
 
-       if *workerPool == true {
+       if *workerPool {
                workerPoolId := fmt.Sprintf("%d", os.Getpid())
                os.Setenv(workerPoolIdEnv, workerPoolId)
                args := []string{
@@ -113,7 +113,7 @@ func main() {
 func launchSDKProcess() error {
        ctx := grpcx.WriteWorkerID(context.Background(), *id)
 
-       info, err := provision.Info(ctx, *provisionEndpoint)
+       info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
        if err != nil {
                log.Fatalf("Failed to obtain provisioning information: %v", err)
        }
@@ -139,14 +139,14 @@ func launchSDKProcess() error {
        if *controlEndpoint == "" {
                log.Fatalf("No control endpoint provided.")
        }
-
-       log.Printf("Initializing python harness: %v", strings.Join(os.Args, " 
"))
+       logger := &tools.Logger{Endpoint: *loggingEndpoint}
+       logger.Printf(ctx, "Initializing python harness: %v", 
strings.Join(os.Args, " "))
 
        // (1) Obtain the pipeline options
 
-       options, err := provision.ProtoToJSON(info.GetPipelineOptions())
+       options, err := tools.ProtoToJSON(info.GetPipelineOptions())
        if err != nil {
-               log.Fatalf("Failed to convert pipeline options: %v", err)
+               logger.Fatalf(ctx, "Failed to convert pipeline options: %v", 
err)
        }
 
        // (2) Retrieve and install the staged packages.
@@ -157,20 +157,20 @@ func launchSDKProcess() error {
        signalChannel := make(chan os.Signal, 1)
        signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, 
syscall.SIGTERM)
 
-       venvDir, err := setupVenv("/opt/apache/beam-venv", *id)
+       venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
        if err != nil {
-               return fmt.Errorf("Failed to initialize Python venv.")
+               return errors.New("failed to initialize Python venv")
        }
        cleanupFunc := func() {
                os.RemoveAll(venvDir)
-               log.Printf("Cleaned up temporary venv for worker %v.", *id)
+               logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", 
*id)
        }
        defer cleanupFunc()
 
        dir := filepath.Join(*semiPersistDir, "staged")
        files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetDependencies(), info.GetRetrievalToken(), dir)
        if err != nil {
-               return fmt.Errorf("Failed to retrieve staged files: %v", err)
+               return fmt.Errorf("failed to retrieve staged files: %v", err)
        }
 
        // TODO(herohde): the packages to install should be specified 
explicitly. It
@@ -179,7 +179,7 @@ func launchSDKProcess() error {
        requirementsFiles := []string{requirementsFile}
        for i, v := range files {
                name, _ := artifact.MustExtractFilePayload(v)
-               log.Printf("Found artifact: %s", name)
+               logger.Printf(ctx, "Found artifact: %s", name)
                fileNames[i] = name
 
                if v.RoleUrn == artifact.URNPipRequirementsFile {
@@ -188,7 +188,7 @@ func launchSDKProcess() error {
        }
 
        if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); 
setupErr != nil {
-               return fmt.Errorf("Failed to install required packages: %v", 
setupErr)
+               return fmt.Errorf("failed to install required packages: %v", 
setupErr)
        }
 
        // (3) Invoke python
@@ -223,7 +223,7 @@ func launchSDKProcess() error {
 
        // Forward trapped signals to child process groups in order to 
terminate them gracefully and avoid zombies
        go func() {
-               log.Printf("Received signal: %v", <-signalChannel)
+               logger.Printf(ctx, "Received signal: %v", <-signalChannel)
                childPids.mu.Lock()
                childPids.canceled = true
                for _, pid := range childPids.v {
@@ -232,7 +232,7 @@ func launchSDKProcess() error {
                                // have elapsed, i.e., as soon as all 
subprocesses have returned from Wait().
                                time.Sleep(5 * time.Second)
                                if err := syscall.Kill(-pid, syscall.SIGKILL); 
err == nil {
-                                       log.Printf("Worker process %v did not 
respond, killed it.", pid)
+                                       logger.Printf(ctx, "Worker process %v 
did not respond, killed it.", pid)
                                }
                        }(pid)
                        syscall.Kill(-pid, syscall.SIGTERM)
@@ -258,7 +258,7 @@ func launchSDKProcess() error {
                                        childPids.mu.Unlock()
                                        return
                                }
-                               log.Printf("Executing Python (worker %v): 
python %v", workerId, strings.Join(args, " "))
+                               logger.Printf(ctx, "Executing Python (worker 
%v): python %v", workerId, strings.Join(args, " "))
                                cmd := 
StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
                                childPids.v = append(childPids.v, 
cmd.Process.Pid)
                                childPids.mu.Unlock()
@@ -268,14 +268,14 @@ func launchSDKProcess() error {
                                        // DoFns throwing exceptions.
                                        errorCount += 1
                                        if errorCount < 4 {
-                                               log.Printf("Python (worker %v) 
exited %v times: %v\nrestarting SDK process",
+                                               logger.Printf(ctx, "Python 
(worker %v) exited %v times: %v\nrestarting SDK process",
                                                        workerId, errorCount, 
err)
                                        } else {
-                                               log.Fatalf("Python (worker %v) 
exited %v times: %v\nout of retries, failing container",
+                                               logger.Fatalf(ctx, "Python 
(worker %v) exited %v times: %v\nout of retries, failing container",
                                                        workerId, errorCount, 
err)
                                        }
                                } else {
-                                       log.Printf("Python (worker %v) 
exited.", workerId)
+                                       logger.Printf(ctx, "Python (worker %v) 
exited.", workerId)
                                        break
                                }
                        }
@@ -307,22 +307,22 @@ func StartCommandEnv(env map[string]string, prog string, 
args ...string) *exec.C
 }
 
 // setupVenv initializes a local Python venv and sets the corresponding env 
variables
-func setupVenv(baseDir, workerId string) (string, error) {
-       log.Printf("Initializing temporary Python venv ...")
+func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId 
string) (string, error) {
+       logger.Printf(ctx, "Initializing temporary Python venv ...")
 
        dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
        if _, err := os.Stat(dir); !os.IsNotExist(err) {
                // Probably leftovers from a previous run
-               log.Printf("Cleaning up previous venv ...")
+               logger.Printf(ctx, "Cleaning up previous venv ...")
                if err := os.RemoveAll(dir); err != nil {
                        return "", err
                }
        }
        if err := os.MkdirAll(dir, 0750); err != nil {
-               return "", fmt.Errorf("Failed to create Python venv directory: 
%s", err)
+               return "", fmt.Errorf("failed to create Python venv directory: 
%s", err)
        }
        if err := execx.Execute("python", "-m", "venv", 
"--system-site-packages", dir); err != nil {
-               return "", fmt.Errorf("Python venv initialization failed: %s", 
err)
+               return "", fmt.Errorf("python venv initialization failed: %s", 
err)
        }
 
        os.Setenv("VIRTUAL_ENV", dir)
@@ -385,16 +385,6 @@ func installSetupPackages(files []string, workDir string, 
requirementsFiles []st
        return nil
 }
 
-// joinPaths joins the dir to every artifact path. Each / in the path is
-// interpreted as a directory separator.
-func joinPaths(dir string, paths ...string) []string {
-       var ret []string
-       for _, p := range paths {
-               ret = append(ret, filepath.Join(dir, filepath.FromSlash(p)))
-       }
-       return ret
-}
-
 // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
 // when flag --setup_only and --artifacts exist. The setup mode will only
 // process the provided artifacts and skip the actual worker program start up.
@@ -405,7 +395,7 @@ func processArtifactsInSetupOnlyMode() {
                log.Fatal("No --artifacts provided along with --setup_only 
flag.")
        }
        workDir := filepath.Dir(*artifacts)
-       metadata, err := ioutil.ReadFile(*artifacts)
+       metadata, err := os.ReadFile(*artifacts)
        if err != nil {
                log.Fatalf("Unable to open artifacts metadata file %v with 
error %v", *artifacts, err)
        }

Reply via email to