This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 34aa17de6cd Allow users to pass service name for profiler for Java And
Go SDK (#35903)
34aa17de6cd is described below
commit 34aa17de6cd01b475fd4315ccb43f175eca28feb
Author: Tanu Sharma <[email protected]>
AuthorDate: Tue Sep 30 19:07:29 2025 +0530
Allow users to pass service name for profiler for Java And Go SDK (#35903)
---
sdks/go/container/boot.go | 42 ++++++++++---
sdks/go/container/boot_test.go | 127 +++++++++++++++++++++++++++------------
sdks/java/container/boot.go | 88 +++++++++++++++++++++------
sdks/java/container/boot_test.go | 45 ++++++++++++++
4 files changed, 237 insertions(+), 65 deletions(-)
diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 3f8562f6ca9..b75201520f3 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -61,22 +61,46 @@ const (
workerPoolIdEnv = "BEAM_GO_WORKER_POOL_ID"
)
-func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger
*tools.Logger, metadata map[string]string) error {
- if metadata == nil {
- return errors.New("enable_google_cloud_profiler is set to true,
but no metadata is received from provision server, profiling will not be
enabled")
+func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger
*tools.Logger, metadata map[string]string, options string) error {
+ const profilerKey = "enable_google_cloud_profiler="
+
+ var parsed map[string]interface{}
+ if err := json.Unmarshal([]byte(options), &parsed); err != nil {
+ panic(err)
}
- jobName, nameExists := metadata["job_name"]
- if !nameExists {
- return errors.New("required job_name missing from metadata,
profiling will not be enabled without it")
+
+ var profilerServiceName string
+
+ // Try from "beam:option:go_options:v1" -> "options" ->
"dataflow_service_options"
+ if goOpts, ok :=
parsed["beam:option:go_options:v1"].(map[string]interface{}); ok {
+ if options, ok := goOpts["options"].(map[string]interface{});
ok {
+ if profilerServiceNameRaw, ok :=
options["dataflow_service_options"].(string); ok {
+ if strings.HasPrefix(profilerServiceNameRaw,
profilerKey) {
+ profilerServiceName =
strings.TrimPrefix(profilerServiceNameRaw, profilerKey)
+ }
+ }
+ }
}
+
+ // Fallback to job_name from metadata
+ if profilerServiceName == "" {
+ if jobName, jobNameExists := metadata["job_name"]; jobNameExists {
+ profilerServiceName = jobName
+ } else {
+ return errors.New("required job_name missing from metadata,
profiling will not be enabled without it")
+ }
+ }
+
jobID, idExists := metadata["job_id"]
if !idExists {
return errors.New("required job_id missing from metadata,
profiling will not be enabled without it")
}
- os.Setenv(cloudProfilingJobName, jobName)
+
+ os.Setenv(cloudProfilingJobName, profilerServiceName)
os.Setenv(cloudProfilingJobID, jobID)
- logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName,
jobID)
+ logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v",
profilerServiceName, jobID)
return nil
+
}
func main() {
@@ -184,7 +208,7 @@ func main() {
enableGoogleCloudProfiler := strings.Contains(options,
enableGoogleCloudProfilerOption)
if enableGoogleCloudProfiler {
- err := configureGoogleCloudProfilerEnvVars(ctx, logger,
info.Metadata)
+ err := configureGoogleCloudProfilerEnvVars(ctx, logger,
info.Metadata, options)
if err != nil {
logger.Printf(ctx, "could not configure Google Cloud
Profiler variables, got %v", err)
}
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
index 49c78047249..244f91fe42e 100644
--- a/sdks/go/container/boot_test.go
+++ b/sdks/go/container/boot_test.go
@@ -205,57 +205,110 @@ func constructArtifactInformation(t *testing.T, roleUrn
string, path string, sha
}
}
+func clearEnvVars() {
+ _ = os.Unsetenv(cloudProfilingJobName)
+ _ = os.Unsetenv(cloudProfilingJobID)
+}
+
func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
tests := []struct {
- name string
- inputMetadata map[string]string
- expectedName string
- expectedID string
- expectedError string
+ name string
+ options string
+ metadata map[string]string
+ expectedName string
+ expectedID string
+ expectingError bool
}{
{
- "nil metadata",
- nil,
- "",
- "",
- "enable_google_cloud_profiler is set to true, but no
metadata is received from provision server, profiling will not be enabled",
+ name: "Profiler name from options",
+ options: `{
+ "beam:option:go_options:v1": {
+ "options": {
+ "dataflow_service_options":
"enable_google_cloud_profiler=custom_profiler"
+ }
+ }
+ }`,
+ metadata: map[string]string{
+ "job_id": "job-123",
+ },
+ expectedName: "custom_profiler",
+ expectedID: "job-123",
+ expectingError: false,
},
{
- "missing name",
- map[string]string{"job_id": "12345"},
- "",
- "",
- "required job_name missing from metadata, profiling
will not be enabled without it",
+ name: "Fallback to job_name",
+ options: `{
+ "beam:option:go_options:v1": {
+ "options": {
+ "dataflow_service_options":
"enable_google_cloud_profiler"
+ }
+ }
+ }`,
+ metadata: map[string]string{
+ "job_name": "fallback_profiler",
+ "job_id": "job-456",
+ },
+ expectedName: "fallback_profiler",
+ expectedID: "job-456",
+ expectingError: false,
},
{
- "missing id",
- map[string]string{"job_name": "my_job"},
- "",
- "",
- "required job_id missing from metadata, profiling will
not be enabled without it",
+ name: "Missing job_id",
+ options: `{
+ "beam:option:go_options:v1": {
+ "options": {
+ "dataflow_service_options":
"enable_google_cloud_profiler=custom_profiler"
+ }
+ }
+ }`,
+ metadata: map[string]string{
+ "job_name": "custom_profiler",
+ },
+ expectingError: true,
},
{
- "correct",
- map[string]string{"job_name": "my_job", "job_id": "42"},
- "my_job",
- "42",
- "",
- },
+ name: "Missing profiler name and job_name",
+ options: `{
+ "beam:option:go_options:v1": {
+ "options": {
+ "dataflow_service_options":
"enable_google_cloud_profiler"
+ }
+ }
+ }`,
+ metadata: map[string]string{
+ "job_id": "job-789",
+ },
+ expectingError: true,
+ },
}
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- t.Cleanup(os.Clearenv)
- err :=
configureGoogleCloudProfilerEnvVars(context.Background(), &tools.Logger{},
test.inputMetadata)
- if err != nil {
- if got, want := err.Error(),
test.expectedError; got != want {
- t.Errorf("got error %v, want error %v",
got, want)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ clearEnvVars()
+ ctx := context.Background()
+
+ err := configureGoogleCloudProfilerEnvVars(ctx,
&tools.Logger{}, tt.metadata, tt.options)
+
+ if tt.expectingError {
+ if err == nil {
+ t.Errorf("Expected error but got nil")
+ }
+ return
+ } else {
+ if err != nil {
+ t.Errorf("Did not expect error but got:
%v", err)
+ return
}
}
- if got, want := os.Getenv(cloudProfilingJobName),
test.expectedName; got != want {
- t.Errorf("got job name %v, want %v", got, want)
+
+ gotName := os.Getenv(cloudProfilingJobName)
+ gotID := os.Getenv(cloudProfilingJobID)
+
+ if gotName != tt.expectedName {
+ t.Errorf("Expected profiler name '%s', got
'%s'", tt.expectedName, gotName)
}
- if got, want := os.Getenv(cloudProfilingJobID),
test.expectedID; got != want {
- t.Errorf("got job id %v, want %v", got, want)
+ if gotID != tt.expectedID {
+ t.Errorf("Expected job ID '%s', got '%s'",
tt.expectedID, gotID)
}
})
}
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 20283740ca0..2b8b510ee9b 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -20,6 +20,7 @@ package main
import (
"context"
"encoding/json"
+ "errors"
"flag"
"fmt"
"log"
@@ -196,25 +197,22 @@ func main() {
enableGoogleCloudProfiler := strings.Contains(options,
enableGoogleCloudProfilerOption)
enableGoogleCloudHeapSampling := strings.Contains(options,
enableGoogleCloudHeapSamplingOption)
if enableGoogleCloudProfiler {
- if metadata := info.GetMetadata(); metadata != nil {
- if jobName, nameExists := metadata["job_name"];
nameExists {
- if jobId, idExists := metadata["job_id"];
idExists {
- if enableGoogleCloudHeapSampling {
- args = append(args,
fmt.Sprintf(googleCloudProfilerAgentHeapArgs, jobName, jobId))
- } else {
- args = append(args,
fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
- }
- logger.Printf(ctx, "Turning on Cloud
Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
- } else {
- logger.Printf(ctx, "Required job_id
missing from metadata, profiling will not be enabled without it.")
- }
- } else {
- logger.Printf(ctx, "Required job_name missing
from metadata, profiling will not be enabled without it.")
- }
- } else {
- logger.Printf(ctx, "enable_google_cloud_profiler is set
to true, but no metadata is received from provision server, profiling will not
be enabled.")
- }
- }
+ metadata := info.GetMetadata()
+ profilerServiceName := ExtractProfilerServiceName(options, metadata)
+
+ if profilerServiceName != "" {
+ if jobId, idExists := metadata["job_id"]; idExists {
+ if enableGoogleCloudHeapSampling {
+ args = append(args,
fmt.Sprintf(googleCloudProfilerAgentHeapArgs, profilerServiceName, jobId))
+ } else {
+ args = append(args,
fmt.Sprintf(googleCloudProfilerAgentBaseArgs, profilerServiceName, jobId))
+ }
+ logger.Printf(ctx, "Turning on Cloud Profiling.
Profile heap: %t, service: %s", enableGoogleCloudHeapSampling,
profilerServiceName)
+ } else {
+ logger.Printf(ctx, "job_id is missing from metadata.
Cannot enable profiling.")
+ }
+ }
+ }
disableJammAgent := strings.Contains(options, disableJammAgentOption)
if disableJammAgent {
@@ -426,3 +424,55 @@ func BuildOptions(ctx context.Context, logger
*tools.Logger, metaOptions []*Meta
}
return options
}
+
+func ExtractProfilerServiceName(options string, metadata map[string]string)
string {
+ const profilerKeyPrefix = "enable_google_cloud_profiler="
+
+ var profilerServiceName string
+
+ var parsed map[string]interface{}
+ if err := json.Unmarshal([]byte(options), &parsed); err != nil {
+ return ""
+ }
+
+ displayData, ok := parsed["display_data"].([]interface{})
+ if !ok {
+ return ""
+ }
+
+ for _, item := range displayData {
+ entry, ok := item.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ if entry["key"] == "dataflowServiceOptions" {
+ rawValue, ok := entry["value"].(string)
+ if !ok {
+ continue
+ }
+ cleaned := strings.Trim(rawValue, "[]")
+ opts := strings.Split(cleaned, ",")
+ for _, opt := range opts {
+ opt = strings.TrimSpace(opt)
+ if strings.HasPrefix(opt, profilerKeyPrefix) {
+ parts := strings.SplitN(opt, "=", 2)
+ if len(parts) == 2 {
+ profilerServiceName = parts[1]
+ break
+ }
+ }
+ }
+ }
+ }
+
+ // Fallback to job_name from metadata
+ if profilerServiceName == "" {
+ if jobName, exists := metadata["job_name"]; exists {
+ profilerServiceName = jobName
+ }else {
+ return errors.New("required job_name missing from metadata,
profiling will not be enabled without it").Error()
+ }
+ }
+
+ return profilerServiceName
+}
diff --git a/sdks/java/container/boot_test.go b/sdks/java/container/boot_test.go
index 61d67e93ecb..63564ad097f 100644
--- a/sdks/java/container/boot_test.go
+++ b/sdks/java/container/boot_test.go
@@ -90,3 +90,48 @@ func TestHeapSizeLimit(t *testing.T) {
t.Errorf("HeapSizeLimit(200 GB). Actual (%d). want 168 GB", lim)
}
}
+
+func TestExtractProfilerServiceName(t *testing.T) {
+ tests := []struct {
+ name string
+ options string
+ metadata map[string]string
+ expected string
+ }{
+ {
+ name: "Extracts custom profiler name from options",
+ options: `{
+ "display_data": [
+ {
+ "key": "dataflowServiceOptions",
+ "value":
"[enable_google_cloud_profiler=custom_profiler,
enable_google_cloud_heap_sampling]"
+ }
+ ]
+ }`,
+ metadata: map[string]string{"job_name":
"fallback_profiler"},
+ expected: "custom_profiler",
+ },
+ {
+ name: "Fallback to job_name when profiler not
specified",
+ options: `{
+ "display_data": [
+ {
+ "key": "dataflowServiceOptions",
+ "value":
"[enable_google_cloud_heap_sampling]"
+ }
+ ]
+ }`,
+ metadata: map[string]string{"job_name":
"fallback_profiler"},
+ expected: "fallback_profiler",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := ExtractProfilerServiceName(tt.options,
tt.metadata)
+ if result != tt.expected {
+ t.Errorf("Expected '%s', got '%s'",
tt.expected, result)
+ }
+ })
+ }
+}
\ No newline at end of file