This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e31e8855ad9 [#27839][Go SDK] Write pipeline options to a file, instead
reading from a flag. (#31482)
e31e8855ad9 is described below
commit e31e8855ad9a7767c79700e7cd5ea31a419a7997
Author: Robert Burke <[email protected]>
AuthorDate: Tue Jun 4 05:17:22 2024 -0700
[#27839][Go SDK] Write pipeline options to a file, instead reading from a
flag. (#31482)
* [#27839] Move pipeline options file creation to tools package.
* Write options to a file in the container instead of burdening the command
line.
---------
Co-authored-by: lostluck <[email protected]>
---
sdks/go/container/boot.go | 4 ++-
sdks/go/container/tools/pipeline_options.go | 39 ++++++++++++++++++++++
sdks/go/pkg/beam/core/runtime/harness/init/init.go | 17 +++++++++-
sdks/java/container/boot.go | 24 +++----------
4 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 1a5e154ace7..15f9ecc101c 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -137,7 +137,9 @@ func main() {
"--logging_endpoint=" + *loggingEndpoint,
"--control_endpoint=" + *controlEndpoint,
"--semi_persist_dir=" + *semiPersistDir,
- "--options=" + options,
+ }
+ if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
+ logger.Fatalf(ctx, "Failed to load pipeline options to worker:
%v", err)
}
if info.GetStatusEndpoint() != nil {
os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
diff --git a/sdks/go/container/tools/pipeline_options.go
b/sdks/go/container/tools/pipeline_options.go
new file mode 100644
index 00000000000..7b46d8fa8c8
--- /dev/null
+++ b/sdks/go/container/tools/pipeline_options.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tools
+
+import (
+ "fmt"
+ "os"
+)
+
+// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file.
+// Assumes the options string is JSON formatted.
+//
+// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the
SDK.
+func MakePipelineOptionsFileAndEnvVar(options string) error {
+ fn := "pipeline_options.json"
+ f, err := os.Create(fn)
+ if err != nil {
+ return fmt.Errorf("unable to create %v: %w", fn, err)
+ }
+ defer f.Close()
+ if _, err := f.WriteString(options); err != nil {
+ return fmt.Errorf("error writing %v: %w", f.Name(), err)
+ }
+ os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
+ return nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index 468708b2917..8a5b45fea5e 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -51,7 +51,7 @@ var (
controlEndpoint = flag.String("control_endpoint", "", "Local control
gRPC endpoint (required in worker mode).")
//lint:ignore U1000 semiPersistDir flag is passed in through the boot
container, will need to be removed later
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local
semi-persistent directory (optional in worker mode).")
- options = flag.String("options", "", "JSON-encoded pipeline
options (required in worker mode).")
+ options = flag.String("options", "", "JSON-encoded pipeline
options (required in worker mode). (deprecated)")
)
type exitMode int
@@ -93,6 +93,21 @@ func hook() {
// will be captured by the framework -- which may not be functional if
// harness.Main returns. We want to be sure any error makes it out.
+ pipelineOptionsFilename := os.Getenv("PIPELINE_OPTIONS_FILE")
+ if pipelineOptionsFilename != "" {
+ if *options != "" {
+ fmt.Fprintf(os.Stderr, "WARNING: env variable
PIPELINE_OPTIONS_FILE set but options flag populated. Potentially bad container
loader. Flag value before overwrite: %v\n", options)
+ }
+ contents, err := os.ReadFile(pipelineOptionsFilename)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to read pipeline options
file '%v': %v\n", pipelineOptionsFilename, err)
+ os.Exit(1)
+ }
+ // Overwite flag to be consistent with the legacy flag
processing.
+ *options = string(contents)
+ }
+ // Load in pipeline options from the flag string. Used for both the new
options file path
+ // and the older flag approach.
if *options != "" {
var opt runtime.RawOptionsWrapper
if err := json.Unmarshal([]byte(*options), &opt); err != nil {
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index f7fd7437c88..ceda3d2be66 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -124,7 +124,7 @@ func main() {
// (3) Invoke the Java harness, preserving artifact ordering in
classpath.
os.Setenv("HARNESS_ID", *id)
- if err := makePipelineOptionsFile(options); err != nil {
+ if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker:
%v", err)
}
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}))
@@ -247,29 +247,13 @@ func main() {
logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...))
}
-// makePipelineOptionsFile writes the pipeline options to a file.
-// Assumes the options string is JSON formatted.
-func makePipelineOptionsFile(options string) error {
- fn := "pipeline_options.json"
- f, err := os.Create(fn)
- if err != nil {
- return fmt.Errorf("unable to create %v: %w", fn, err)
- }
- defer f.Close()
- if _, err := f.WriteString(options); err != nil {
- return fmt.Errorf("error writing %v: %w", f.Name(), err)
- }
- os.Setenv("PIPELINE_OPTIONS_FILE", f.Name())
- return nil
-}
-
// heapSizeLimit returns 80% of the runner limit, if provided. If not provided,
// it returns 70% of the physical memory on the machine. If it cannot determine
// that value, it returns 1GB. This is an imperfect heuristic. It aims to
// ensure there is memory for non-heap use and other overhead, while also not
-// underutilizing the machine. if set_recommended_max_xmx experiment is
enabled,
-// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops
-// utilizes memory more efficiently, and has positive impact on GC performance
+// underutilizing the machine. if set_recommended_max_xmx experiment is
enabled,
+// sets xmx to 32G. Under 32G JVM enables CompressedOops. CompressedOops
+// utilizes memory more efficiently, and has positive impact on GC performance
// and cache hit rate.
func heapSizeLimit(info *fnpb.ProvisionInfo, setRecommendedMaxXmx bool) uint64
{
if setRecommendedMaxXmx {