This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch bootloaderLogging in repository https://gitbox.apache.org/repos/asf/beam.git
commit a3d86420330ba916ef1b342d1993aa6055df7435 Author: lostluck <[email protected]> AuthorDate: Wed Mar 29 14:44:28 2023 -0700 (#25314) Have boot loaders log to logging service where possible. --- sdks/go/container/boot.go | 33 +++++---- sdks/go/container/tools/logging.go | 117 ++++++++++++++++++++++++++++++ sdks/go/container/tools/logging_test.go | 115 +++++++++++++++++++++++++++++ sdks/go/container/tools/provision.go | 89 +++++++++++++++++++++++ sdks/go/container/tools/provision_test.go | 110 ++++++++++++++++++++++++++++ sdks/java/container/boot.go | 47 ++++++------ sdks/python/container/boot.go | 62 +++++++--------- 7 files changed, 500 insertions(+), 73 deletions(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index db851048271..1a5e154ace7 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" @@ -35,7 +36,6 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" - "github.com/apache/beam/sdks/v2/go/pkg/beam/provision" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" @@ -58,7 +58,7 @@ const ( enableGoogleCloudProfilerOption = "enable_google_cloud_profiler" ) -func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error { +func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger *tools.Logger, metadata map[string]string) error { if metadata == nil { return errors.New("enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled") } @@ -72,7 +72,7 @@ func configureGoogleCloudProfilerEnvVars(metadata map[string]string) error { } os.Setenv(cloudProfilingJobName, jobName) os.Setenv(cloudProfilingJobID, jobID) - log.Printf("Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID) + logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, jobID) return nil } @@ -87,7 +87,7 @@ func main() { ctx := grpcx.WriteWorkerID(context.Background(), *id) - info, err := provision.Info(ctx, *provisionEndpoint) + info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) if err != nil { log.Fatalf("Failed to obtain provisioning information: %v", err) } @@ -97,13 +97,14 @@ func main() { if err != nil { log.Fatalf("Endpoint not set: %v", err) } - log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " ")) + logger := &tools.Logger{Endpoint: *loggingEndpoint} + logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, " ")) // (1) Obtain the pipeline options - options, err := provision.ProtoToJSON(info.GetPipelineOptions()) + options, err := tools.ProtoToJSON(info.GetPipelineOptions()) if err != nil { - log.Fatalf("Failed to convert pipeline options: %v", err) + logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } // (2) Retrieve the staged files. @@ -115,19 +116,19 @@ func main() { dir := filepath.Join(*semiPersistDir, "staged") artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) + logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err) } - name, err := getGoWorkerArtifactName(artifacts) + name, err := getGoWorkerArtifactName(ctx, logger, artifacts) if err != nil { - log.Fatalf("Failed to get Go Worker Artifact Name: %v", err) + logger.Fatalf(ctx, "Failed to get Go Worker Artifact Name: %v", err) } // (3) The persist dir may be on a noexec volume, so we must // copy the binary to a different location to execute. const prog = "/bin/worker" if err := copyExe(filepath.Join(dir, name), prog); err != nil { - log.Fatalf("Failed to copy worker binary: %v", err) + logger.Fatalf(ctx, "Failed to copy worker binary: %v", err) } args := []string{ @@ -148,9 +149,9 @@ func main() { enableGoogleCloudProfiler := strings.Contains(options, enableGoogleCloudProfilerOption) if enableGoogleCloudProfiler { - err := configureGoogleCloudProfilerEnvVars(info.Metadata) + err := configureGoogleCloudProfilerEnvVars(ctx, logger, info.Metadata) if err != nil { - log.Printf("could not configure Google Cloud Profiler variables, got %v", err) + logger.Printf(ctx, "could not configure Google Cloud Profiler variables, got %v", err) } } @@ -166,10 +167,10 @@ func main() { } } - log.Fatalf("User program exited: %v", err) + logger.Fatalf(ctx, "User program exited: %v", err) } -func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) { +func getGoWorkerArtifactName(ctx context.Context, logger *tools.Logger, artifacts []*pipepb.ArtifactInformation) (string, error) { const worker = "worker" name := worker @@ -190,7 +191,7 @@ func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, e for _, a := range artifacts { n, _ := artifact.MustExtractFilePayload(a) if n == worker { - log.Printf("Go worker binary found with legacy name '%v'", worker) + logger.Printf(ctx, "Go worker binary found with legacy name '%v'", worker) return n, nil } } diff --git a/sdks/go/container/tools/logging.go b/sdks/go/container/tools/logging.go new file mode 100644 index 00000000000..fd8ed50e020 --- /dev/null +++ b/sdks/go/container/tools/logging.go @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tools + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "sync" + "time" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// Logger is a wrapper around the FnAPI Logging Client, intended for +// container boot loader use. Not intended for Beam end users. +type Logger struct { + Endpoint string + + client logSender + closeFn func() + mu sync.Mutex // To protect Send in the rare case multiple goroutines are calling this logger. +} + +type logSender interface { + Send(*fnpb.LogEntry_List) error + CloseSend() error +} + +func (l *Logger) Close() { + if l.closeFn != nil { + l.client.CloseSend() + l.closeFn() + l.closeFn = nil + l.client = nil + } +} + +// Log a message with the given severity. +func (l *Logger) Log(ctx context.Context, sev fnpb.LogEntry_Severity_Enum, message string) { + l.mu.Lock() + defer l.mu.Unlock() + + var exitErr error + defer func() { + if exitErr != nil { + log.Println("boot.go: error logging message over FnAPI. endpoint", l.Endpoint, "error:", exitErr, "message follows") + log.Println(sev.String(), message) + } + }() + if l.client == nil { + if l.Endpoint == "" { + exitErr = errors.New("no logging endpoint set") + return + } + cc, err := grpcx.Dial(ctx, l.Endpoint, 2*time.Minute) + if err != nil { + exitErr = err + return + } + l.closeFn = func() { cc.Close() } + + l.client, err = fnpb.NewBeamFnLoggingClient(cc).Logging(ctx) + if err != nil { + exitErr = err + l.Close() + return + } + } + + err := l.client.Send(&fnpb.LogEntry_List{ + LogEntries: []*fnpb.LogEntry{ + { + Severity: sev, + Timestamp: timestamppb.Now(), + Message: message, + }, + }, + }) + if err != nil { + exitErr = err + return + } +} + +// Printf logs the message with Debug severity. +func (l *Logger) Printf(ctx context.Context, format string, args ...any) { + l.Log(ctx, fnpb.LogEntry_Severity_DEBUG, fmt.Sprintf(format, args...)) +} + +// Warnf logs the message with Warning severity. +func (l *Logger) Warnf(ctx context.Context, format string, args ...any) { + l.Log(ctx, fnpb.LogEntry_Severity_WARN, fmt.Sprintf(format, args...)) +} + +// Fatalf logs the message with Critical severity, and then calls os.Exit(1). +func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) { + l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, args...)) + os.Exit(1) +} diff --git a/sdks/go/container/tools/logging_test.go b/sdks/go/container/tools/logging_test.go new file mode 100644 index 00000000000..e33b3c075a6 --- /dev/null +++ b/sdks/go/container/tools/logging_test.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tools + +import ( + "bytes" + "context" + "errors" + "log" + "strings" + "testing" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" +) + +type logCatcher struct { + msgs []*fnpb.LogEntry_List + err error +} + +func (l *logCatcher) Send(msg *fnpb.LogEntry_List) error { + l.msgs = append(l.msgs, msg) + return l.err +} + +func (l *logCatcher) CloseSend() error { + return nil +} + +func TestLogger(t *testing.T) { + ctx := context.Background() + t.Run("SuccessfulLogging", func(t *testing.T) { + catcher := &logCatcher{} + l := &Logger{client: catcher} + + l.Printf(ctx, "foo %v", "bar") + + received := catcher.msgs[0].GetLogEntries()[0] + + if got, want := received.Message, "foo bar"; got != want { + t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message %q, want %q", got, want) + } + + if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want { + t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity %v, want %v", got, want) + } + }) + t.Run("backup path", func(t *testing.T) { + catcher := &logCatcher{} + l := &Logger{client: catcher} + + // Validate error outputs. + var buf bytes.Buffer + ll := log.Default() + ll.SetOutput(&buf) + + catcher.err = errors.New("test error") + wantMsg := "checking for error?" + l.Printf(ctx, wantMsg) + + line, err := buf.ReadString('\n') + if err != nil { + t.Errorf("unexpected error reading form backup log buffer: %v", err) + } + + if got, want := line, "boot.go: error logging message over FnAPI"; !strings.Contains(got, want) { + t.Errorf("backup log buffer didn't contain expected log, got %q, want it to contain %q", got, want) + } + if got, want := line, "test error"; !strings.Contains(got, want) { + t.Errorf("backup log buffer didn't contain expected log, got %q, want it to contain %q", got, want) + } + + line, err = buf.ReadString('\n') + if err != nil { + t.Errorf("unexpected error reading form backup log buffer: %v", err) + } + + if got, want := line, wantMsg; !strings.Contains(got, want) { + t.Errorf("backup log buffer didn't contain the message, got %q, want it to contain %q", got, want) + } + }) + + t.Run("no endpoint", func(t *testing.T) { + l := &Logger{} + + var buf bytes.Buffer + ll := log.Default() + ll.SetOutput(&buf) + + l.Printf(ctx, "trying to log") + + line, err := buf.ReadString('\n') + if err != nil { + t.Errorf("unexpected error reading form backup log buffer: %v", err) + } + if got, want := line, "no logging endpoint set"; !strings.Contains(got, want) { + t.Errorf("backup log buffer didn't contain expected error, got %q, want it to contain %q", got, want) + } + + }) + +} diff --git a/sdks/go/container/tools/provision.go b/sdks/go/container/tools/provision.go new file mode 100644 index 00000000000..dab3383fc17 --- /dev/null +++ b/sdks/go/container/tools/provision.go @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tools contains utilities for Beam bootloader containers, such as +// for obtaining runtime provision information -- such as pipeline options. +// or for logging to the log service. +// +// For Beam Internal use. +package tools + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" + "github.com/golang/protobuf/jsonpb" + google_pb "github.com/golang/protobuf/ptypes/struct" +) + +// ProvisionInfo returns the runtime provisioning info for the worker. +func ProvisionInfo(ctx context.Context, endpoint string) (*fnpb.ProvisionInfo, error) { + cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute) + if err != nil { + return nil, err + } + defer cc.Close() + + client := fnpb.NewProvisionServiceClient(cc) + + resp, err := client.GetProvisionInfo(ctx, &fnpb.GetProvisionInfoRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to get manifest: %w", err) + } + if resp.GetInfo() == nil { + return nil, errors.New("empty manifest") + } + return resp.GetInfo(), nil +} + +// OptionsToProto converts pipeline options to a proto struct via JSON. +func OptionsToProto(v any) (*google_pb.Struct, error) { + data, err := json.Marshal(v) + if err != nil { + return nil, err + } + return JSONToProto(string(data)) +} + +// JSONToProto converts JSON-encoded pipeline options to a proto struct. +func JSONToProto(data string) (*google_pb.Struct, error) { + var out google_pb.Struct + if err := jsonpb.UnmarshalString(string(data), &out); err != nil { + return nil, err + } + return &out, nil +} + +// ProtoToOptions converts pipeline options from a proto struct via JSON. +func ProtoToOptions(opt *google_pb.Struct, v any) error { + data, err := ProtoToJSON(opt) + if err != nil { + return err + } + return json.Unmarshal([]byte(data), v) +} + +// ProtoToJSON converts pipeline options from a proto struct to JSON. +func ProtoToJSON(opt *google_pb.Struct) (string, error) { + if opt == nil { + return "{}", nil + } + return (&jsonpb.Marshaler{}).MarshalToString(opt) +} diff --git a/sdks/go/container/tools/provision_test.go b/sdks/go/container/tools/provision_test.go new file mode 100644 index 00000000000..dbb78040632 --- /dev/null +++ b/sdks/go/container/tools/provision_test.go @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tools + +import ( + "context" + "fmt" + "log" + "net" + "reflect" + "sync" + "testing" + + fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "google.golang.org/grpc" +) + +type s struct { + A int `json:"a,omitempty"` + B string `json:"b,omitempty"` + C bool `json:"c,omitempty"` + D *s `json:"d,omitempty"` +} + +// TestConversions verifies that we can process proto structs via JSON. +func TestConversions(t *testing.T) { + tests := []s{ + s{}, + s{A: 2}, + s{B: "foo"}, + s{C: true}, + s{D: &s{A: 3}}, + s{A: 1, B: "bar", C: true, D: &s{A: 3, B: "baz"}}, + } + + for _, test := range tests { + enc, err := OptionsToProto(test) + if err != nil { + t.Errorf("Failed to marshal %v: %v", test, err) + } + var ret s + if err := ProtoToOptions(enc, &ret); err != nil { + t.Errorf("Failed to unmarshal %v from %v: %v", test, enc, err) + } + if !reflect.DeepEqual(test, ret) { + t.Errorf("Unmarshal(Marshal(%v)) = %v, want %v", test, ret, test) + } + } +} + +type ProvisionServiceServicer struct { + fnpb.UnimplementedProvisionServiceServer +} + +func (p ProvisionServiceServicer) GetProvisionInfo(ctx context.Context, req *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) { + return &fnpb.GetProvisionInfoResponse{Info: &fnpb.ProvisionInfo{RetrievalToken: "token"}}, nil +} + +func setup(addr *string, wg *sync.WaitGroup) { + l, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatalf("failed to find an open port: %v", err) + } + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port + *addr = fmt.Sprintf(":%d", port) + + server := grpc.NewServer() + defer server.Stop() + + prs := &ProvisionServiceServicer{} + fnpb.RegisterProvisionServiceServer(server, prs) + + wg.Done() + + if err := server.Serve(l); err != nil { + log.Fatalf("cannot serve the server: %v", err) + } +} + +func TestProvisionInfo(t *testing.T) { + + endpoint := "" + var wg sync.WaitGroup + wg.Add(1) + go setup(&endpoint, &wg) + wg.Wait() + + got, err := ProvisionInfo(context.Background(), endpoint) + if err != nil { + t.Errorf("error in response: %v", err) + } + want := &fnpb.ProvisionInfo{RetrievalToken: "token"} + if got.GetRetrievalToken() != want.GetRetrievalToken() { + t.Errorf("provision.Info() = %v, want %v", got, want) + } +} diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index a5a42d4fb1e..ea8d3cd8e38 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -30,10 +30,10 @@ import ( "strconv" "strings" + "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" - "github.com/apache/beam/sdks/v2/go/pkg/beam/provision" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/syscallx" @@ -71,7 +71,7 @@ func main() { ctx := grpcx.WriteWorkerID(context.Background(), *id) - info, err := provision.Info(ctx, *provisionEndpoint) + info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) if err != nil { log.Fatalf("Failed to obtain provisioning information: %v", err) } @@ -97,13 +97,14 @@ func main() { if *controlEndpoint == "" { log.Fatal("No control endpoint provided.") } + logger := &tools.Logger{Endpoint: *loggingEndpoint} - log.Printf("Initializing java harness: %v", strings.Join(os.Args, " ")) + logger.Printf(ctx, "Initializing java harness: %v", strings.Join(os.Args, " ")) // (1) Obtain the pipeline options - options, err := provision.ProtoToJSON(info.GetPipelineOptions()) + options, err := tools.ProtoToJSON(info.GetPipelineOptions()) if err != nil { - log.Fatalf("Failed to convert pipeline options: %v", err) + logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } // (2) Retrieve the staged user jars. We ignore any disk limit, @@ -118,7 +119,7 @@ func main() { artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) + logger.Fatalf(ctx, "Failed to retrieve staged files: %v", err) } // (3) Invoke the Java harness, preserving artifact ordering in classpath. @@ -178,28 +179,32 @@ func main() { } else { args = append(args, fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId)) } - log.Printf("Turning on Cloud Profiling. Profile heap: %t", enableGoogleCloudHeapSampling) + logger.Printf(ctx, "Turning on Cloud Profiling. Profile heap: %t", enableGoogleCloudHeapSampling) } else { - log.Println("Required job_id missing from metadata, profiling will not be enabled without it.") + logger.Printf(ctx, "Required job_id missing from metadata, profiling will not be enabled without it.") } } else { - log.Println("Required job_name missing from metadata, profiling will not be enabled without it.") + logger.Printf(ctx, "Required job_name missing from metadata, profiling will not be enabled without it.") } } else { - log.Println("enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled.") + logger.Printf(ctx, "enable_google_cloud_profiler is set to true, but no metadata is received from provision server, profiling will not be enabled.") } } disableJammAgent := strings.Contains(options, disableJammAgentOption) if disableJammAgent { - log.Printf("Disabling Jamm agent. Measuring object size will be inaccurate.") + logger.Printf(ctx, "Disabling Jamm agent. Measuring object size will be inaccurate.") } else { args = append(args, jammAgentArgs) } // Apply meta options const metaDir = "/opt/apache/beam/options" - metaOptions, err := LoadMetaOptions(metaDir) - javaOptions := BuildOptions(metaOptions) + + // Note: Error is unchecked, so parsing errors won't abort container. + // TODO: verify if it's intentional or not. + metaOptions, _ := LoadMetaOptions(ctx, logger, metaDir) + + javaOptions := BuildOptions(ctx, logger, metaOptions) // (1) Add custom jvm arguments: "-server -Xmx1324 -XXfoo .." args = append(args, javaOptions.JavaArguments...) @@ -221,19 +226,19 @@ func main() { if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok { if modules, ok := pipelineOptions.GetStructValue().GetFields()["jdkAddOpenModules"]; ok { for _, module := range modules.GetListValue().GetValues() { - args = append(args, "--add-opens=" + module.GetStringValue()) + args = append(args, "--add-opens="+module.GetStringValue()) } } } // Automatically open modules for Java 11+ openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar" if _, err := os.Stat(openModuleAgentJar); err == nil { - args = append(args, "-javaagent:" + openModuleAgentJar) + args = append(args, "-javaagent:"+openModuleAgentJar) } args = append(args, "org.apache.beam.fn.harness.FnHarness") - log.Printf("Executing: java %v", strings.Join(args, " ")) + logger.Printf(ctx, "Executing: java %v", strings.Join(args, " ")) - log.Fatalf("Java exited: %v", execx.Execute("java", args...)) + logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...)) } // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, @@ -301,7 +306,7 @@ func (f byPriority) Less(i, j int) bool { return f[i].Priority > f[j].Priority } // // Loading meta-options from disk allows extra files and their // configuration be kept together and defined externally. -func LoadMetaOptions(dir string) ([]*MetaOption, error) { +func LoadMetaOptions(ctx context.Context, logger *tools.Logger, dir string) ([]*MetaOption, error) { var meta []*MetaOption worker := func(path string, info os.FileInfo, err error) error { @@ -328,7 +333,7 @@ func LoadMetaOptions(dir string) ([]*MetaOption, error) { return fmt.Errorf("failed to parse %s: %v", path, err) } - log.Printf("Loaded meta-option '%s'", option.Name) + logger.Printf(ctx, "Loaded meta-option '%s'", option.Name) meta = append(meta, &option) return nil @@ -340,7 +345,7 @@ func LoadMetaOptions(dir string) ([]*MetaOption, error) { return meta, nil } -func BuildOptions(metaOptions []*MetaOption) *Options { +func BuildOptions(ctx context.Context, logger *tools.Logger, metaOptions []*MetaOption) *Options { options := &Options{Properties: make(map[string]string)} sort.Sort(byPriority(metaOptions)) @@ -357,7 +362,7 @@ func BuildOptions(metaOptions []*MetaOption) *Options { if !exists { options.Properties[key] = value } else { - log.Printf("Warning: %s property -D%s=%s was redefined", meta.Name, key, value) + logger.Warnf(ctx, "Warning: %s property -D%s=%s was redefined", meta.Name, key, value) } } diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index bb9acbacf62..1a96859669a 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -20,9 +20,9 @@ package main import ( "context" "encoding/json" + "errors" "flag" "fmt" - "io/ioutil" "log" "os" "os/exec" @@ -34,9 +34,9 @@ import ( "syscall" "time" + "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" - "github.com/apache/beam/sdks/v2/go/pkg/beam/provision" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "github.com/golang/protobuf/jsonpb" @@ -81,7 +81,7 @@ func main() { os.Exit(0) } - if *workerPool == true { + if *workerPool { workerPoolId := fmt.Sprintf("%d", os.Getpid()) os.Setenv(workerPoolIdEnv, workerPoolId) args := []string{ @@ -113,7 +113,7 @@ func main() { func launchSDKProcess() error { ctx := grpcx.WriteWorkerID(context.Background(), *id) - info, err := provision.Info(ctx, *provisionEndpoint) + info, err := tools.ProvisionInfo(ctx, *provisionEndpoint) if err != nil { log.Fatalf("Failed to obtain provisioning information: %v", err) } @@ -139,14 +139,14 @@ func launchSDKProcess() error { if *controlEndpoint == "" { log.Fatalf("No control endpoint provided.") } - - log.Printf("Initializing python harness: %v", strings.Join(os.Args, " ")) + logger := &tools.Logger{Endpoint: *loggingEndpoint} + logger.Printf(ctx, "Initializing python harness: %v", strings.Join(os.Args, " ")) // (1) Obtain the pipeline options - options, err := provision.ProtoToJSON(info.GetPipelineOptions()) + options, err := tools.ProtoToJSON(info.GetPipelineOptions()) if err != nil { - log.Fatalf("Failed to convert pipeline options: %v", err) + logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } // (2) Retrieve and install the staged packages. @@ -157,20 +157,20 @@ func launchSDKProcess() error { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - venvDir, err := setupVenv("/opt/apache/beam-venv", *id) + venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id) if err != nil { - return fmt.Errorf("Failed to initialize Python venv.") + return errors.New("failed to initialize Python venv") } cleanupFunc := func() { os.RemoveAll(venvDir) - log.Printf("Cleaned up temporary venv for worker %v.", *id) + logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id) } defer cleanupFunc() dir := filepath.Join(*semiPersistDir, "staged") files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) if err != nil { - return fmt.Errorf("Failed to retrieve staged files: %v", err) + return fmt.Errorf("failed to retrieve staged files: %v", err) } // TODO(herohde): the packages to install should be specified explicitly. It @@ -179,7 +179,7 @@ func launchSDKProcess() error { requirementsFiles := []string{requirementsFile} for i, v := range files { name, _ := artifact.MustExtractFilePayload(v) - log.Printf("Found artifact: %s", name) + logger.Printf(ctx, "Found artifact: %s", name) fileNames[i] = name if v.RoleUrn == artifact.URNPipRequirementsFile { @@ -188,7 +188,7 @@ func launchSDKProcess() error { } if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { - return fmt.Errorf("Failed to install required packages: %v", setupErr) + return fmt.Errorf("failed to install required packages: %v", setupErr) } // (3) Invoke python @@ -223,7 +223,7 @@ func launchSDKProcess() error { // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies go func() { - log.Printf("Received signal: %v", <-signalChannel) + logger.Printf(ctx, "Received signal: %v", <-signalChannel) childPids.mu.Lock() childPids.canceled = true for _, pid := range childPids.v { @@ -232,7 +232,7 @@ func launchSDKProcess() error { // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). time.Sleep(5 * time.Second) if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil { - log.Printf("Worker process %v did not respond, killed it.", pid) + logger.Printf(ctx, "Worker process %v did not respond, killed it.", pid) } }(pid) syscall.Kill(-pid, syscall.SIGTERM) @@ -258,7 +258,7 @@ func launchSDKProcess() error { childPids.mu.Unlock() return } - log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) + logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...) childPids.v = append(childPids.v, cmd.Process.Pid) childPids.mu.Unlock() @@ -268,14 +268,14 @@ func launchSDKProcess() error { // DoFns throwing exceptions. errorCount += 1 if errorCount < 4 { - log.Printf("Python (worker %v) exited %v times: %v\nrestarting SDK process", + logger.Printf(ctx, "Python (worker %v) exited %v times: %v\nrestarting SDK process", workerId, errorCount, err) } else { - log.Fatalf("Python (worker %v) exited %v times: %v\nout of retries, failing container", + logger.Fatalf(ctx, "Python (worker %v) exited %v times: %v\nout of retries, failing container", workerId, errorCount, err) } } else { - log.Printf("Python (worker %v) exited.", workerId) + logger.Printf(ctx, "Python (worker %v) exited.", workerId) break } } @@ -307,22 +307,22 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C } // setupVenv initializes a local Python venv and sets the corresponding env variables -func setupVenv(baseDir, workerId string) (string, error) { - log.Printf("Initializing temporary Python venv ...") +func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) { + logger.Printf(ctx, "Initializing temporary Python venv ...") dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId) if _, err := os.Stat(dir); !os.IsNotExist(err) { // Probably leftovers from a previous run - log.Printf("Cleaning up previous venv ...") + logger.Printf(ctx, "Cleaning up previous venv ...") if err := os.RemoveAll(dir); err != nil { return "", err } } if err := os.MkdirAll(dir, 0750); err != nil { - return "", fmt.Errorf("Failed to create Python venv directory: %s", err) + return "", fmt.Errorf("failed to create Python venv directory: %s", err) } if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil { - return "", fmt.Errorf("Python venv initialization failed: %s", err) + return "", fmt.Errorf("python venv initialization failed: %s", err) } os.Setenv("VIRTUAL_ENV", dir) @@ -385,16 +385,6 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st return nil } -// joinPaths joins the dir to every artifact path. Each / in the path is -// interpreted as a directory separator. -func joinPaths(dir string, paths ...string) []string { - var ret []string - for _, p := range paths { - ret = append(ret, filepath.Join(dir, filepath.FromSlash(p))) - } - return ret -} - // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts // when flag --setup_only and --artifacts exist. The setup mode will only // process the provided artifacts and skip the actual worker program start up. @@ -405,7 +395,7 @@ func processArtifactsInSetupOnlyMode() { log.Fatal("No --artifacts provided along with --setup_only flag.") } workDir := filepath.Dir(*artifacts) - metadata, err := ioutil.ReadFile(*artifacts) + metadata, err := os.ReadFile(*artifacts) if err != nil { log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err) }
