This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 34aa17de6cd Allow users to pass service name for profiler for Java And 
Go SDK (#35903)
34aa17de6cd is described below

commit 34aa17de6cd01b475fd4315ccb43f175eca28feb
Author: Tanu Sharma <[email protected]>
AuthorDate: Tue Sep 30 19:07:29 2025 +0530

    Allow users to pass service name for profiler for Java And Go SDK (#35903)
---
 sdks/go/container/boot.go        |  42 ++++++++++---
 sdks/go/container/boot_test.go   | 127 +++++++++++++++++++++++++++------------
 sdks/java/container/boot.go      |  88 +++++++++++++++++++++------
 sdks/java/container/boot_test.go |  45 ++++++++++++++
 4 files changed, 237 insertions(+), 65 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 3f8562f6ca9..b75201520f3 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -61,22 +61,46 @@ const (
        workerPoolIdEnv                 = "BEAM_GO_WORKER_POOL_ID"
 )
 
-func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger 
*tools.Logger, metadata map[string]string) error {
-       if metadata == nil {
-               return errors.New("enable_google_cloud_profiler is set to true, 
but no metadata is received from provision server, profiling will not be 
enabled")
+func configureGoogleCloudProfilerEnvVars(ctx context.Context, logger 
*tools.Logger, metadata map[string]string, options string) error {
+       const profilerKey = "enable_google_cloud_profiler="
+
+       var parsed map[string]interface{}
+       if err := json.Unmarshal([]byte(options), &parsed); err != nil {
+               panic(err)
        }
-       jobName, nameExists := metadata["job_name"]
-       if !nameExists {
-               return errors.New("required job_name missing from metadata, 
profiling will not be enabled without it")
+
+       var profilerServiceName string
+
+       // Try from "beam:option:go_options:v1" -> "options" -> 
"dataflow_service_options"
+       if goOpts, ok := 
parsed["beam:option:go_options:v1"].(map[string]interface{}); ok {
+               if options, ok := goOpts["options"].(map[string]interface{}); 
ok {
+                       if profilerServiceNameRaw, ok := 
options["dataflow_service_options"].(string); ok {
+                               if strings.HasPrefix(profilerServiceNameRaw, 
profilerKey) {
+                                       profilerServiceName = 
strings.TrimPrefix(profilerServiceNameRaw, profilerKey)
+                               }
+                       }
+               }
        }
+
+       // Fallback to job_name from metadata
+    if profilerServiceName == "" {
+        if jobName, jobNameExists := metadata["job_name"]; jobNameExists {
+            profilerServiceName = jobName
+        } else {
+            return errors.New("required job_name missing from metadata, 
profiling will not be enabled without it")
+        }
+    }
+
        jobID, idExists := metadata["job_id"]
        if !idExists {
                return errors.New("required job_id missing from metadata, 
profiling will not be enabled without it")
        }
-       os.Setenv(cloudProfilingJobName, jobName)
+
+       os.Setenv(cloudProfilingJobName, profilerServiceName)
        os.Setenv(cloudProfilingJobID, jobID)
-       logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", jobName, 
jobID)
+       logger.Printf(ctx, "Cloud Profiling Job Name: %v, Job IDL %v", 
profilerServiceName, jobID)
        return nil
+
 }
 
 func main() {
@@ -184,7 +208,7 @@ func main() {
 
        enableGoogleCloudProfiler := strings.Contains(options, 
enableGoogleCloudProfilerOption)
        if enableGoogleCloudProfiler {
-               err := configureGoogleCloudProfilerEnvVars(ctx, logger, 
info.Metadata)
+               err := configureGoogleCloudProfilerEnvVars(ctx, logger, 
info.Metadata, options)
                if err != nil {
                        logger.Printf(ctx, "could not configure Google Cloud 
Profiler variables, got %v", err)
                }
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
index 49c78047249..244f91fe42e 100644
--- a/sdks/go/container/boot_test.go
+++ b/sdks/go/container/boot_test.go
@@ -205,57 +205,110 @@ func constructArtifactInformation(t *testing.T, roleUrn 
string, path string, sha
        }
 }
 
+func clearEnvVars() {
+       _ = os.Unsetenv(cloudProfilingJobName)
+       _ = os.Unsetenv(cloudProfilingJobID)
+}
+
 func TestConfigureGoogleCloudProfilerEnvVars(t *testing.T) {
        tests := []struct {
-               name          string
-               inputMetadata map[string]string
-               expectedName  string
-               expectedID    string
-               expectedError string
+               name           string
+               options        string
+               metadata       map[string]string
+               expectedName   string
+               expectedID     string
+               expectingError bool
        }{
                {
-                       "nil metadata",
-                       nil,
-                       "",
-                       "",
-                       "enable_google_cloud_profiler is set to true, but no 
metadata is received from provision server, profiling will not be enabled",
+                       name: "Profiler name from options",
+                       options: `{
+                               "beam:option:go_options:v1": {
+                                       "options": {
+                                               "dataflow_service_options": 
"enable_google_cloud_profiler=custom_profiler"
+                                       }
+                               }
+                       }`,
+                       metadata: map[string]string{
+                               "job_id": "job-123",
+                       },
+                       expectedName:   "custom_profiler",
+                       expectedID:     "job-123",
+                       expectingError: false,
                },
                {
-                       "missing name",
-                       map[string]string{"job_id": "12345"},
-                       "",
-                       "",
-                       "required job_name missing from metadata, profiling 
will not be enabled without it",
+                       name: "Fallback to job_name",
+                       options: `{
+                               "beam:option:go_options:v1": {
+                                       "options": {
+                                           "dataflow_service_options": 
"enable_google_cloud_profiler"
+                                       }
+                               }
+                       }`,
+                       metadata: map[string]string{
+                               "job_name": "fallback_profiler",
+                               "job_id":   "job-456",
+                       },
+                       expectedName:   "fallback_profiler",
+                       expectedID:     "job-456",
+                       expectingError: false,
                },
                {
-                       "missing id",
-                       map[string]string{"job_name": "my_job"},
-                       "",
-                       "",
-                       "required job_id missing from metadata, profiling will 
not be enabled without it",
+                       name: "Missing job_id",
+                       options: `{
+                               "beam:option:go_options:v1": {
+                                       "options": {
+                                               "dataflow_service_options": 
"enable_google_cloud_profiler=custom_profiler"
+                                       }
+                               }
+                       }`,
+                       metadata: map[string]string{
+                               "job_name": "custom_profiler",
+                       },
+                       expectingError: true,
                },
                {
-                       "correct",
-                       map[string]string{"job_name": "my_job", "job_id": "42"},
-                       "my_job",
-                       "42",
-                       "",
-               },
+               name: "Missing profiler name and job_name",
+               options: `{
+                       "beam:option:go_options:v1": {
+                               "options": {
+                                       "dataflow_service_options": 
"enable_google_cloud_profiler"
+                               }
+                       }
+               }`,
+               metadata: map[string]string{
+                       "job_id": "job-789",
+               },
+               expectingError: true,
+        },
        }
-       for _, test := range tests {
-               t.Run(test.name, func(t *testing.T) {
-                       t.Cleanup(os.Clearenv)
-                       err := 
configureGoogleCloudProfilerEnvVars(context.Background(), &tools.Logger{}, 
test.inputMetadata)
-                       if err != nil {
-                               if got, want := err.Error(), 
test.expectedError; got != want {
-                                       t.Errorf("got error %v, want error %v", 
got, want)
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       clearEnvVars()
+                       ctx := context.Background()
+
+                       err := configureGoogleCloudProfilerEnvVars(ctx, 
&tools.Logger{}, tt.metadata, tt.options)
+
+                       if tt.expectingError {
+                               if err == nil {
+                                       t.Errorf("Expected error but got nil")
+                               }
+                               return
+                       } else {
+                               if err != nil {
+                                       t.Errorf("Did not expect error but got: 
%v", err)
+                                       return
                                }
                        }
-                       if got, want := os.Getenv(cloudProfilingJobName), 
test.expectedName; got != want {
-                               t.Errorf("got job name %v, want %v", got, want)
+
+                       gotName := os.Getenv(cloudProfilingJobName)
+                       gotID := os.Getenv(cloudProfilingJobID)
+
+                       if gotName != tt.expectedName {
+                               t.Errorf("Expected profiler name '%s', got 
'%s'", tt.expectedName, gotName)
                        }
-                       if got, want := os.Getenv(cloudProfilingJobID), 
test.expectedID; got != want {
-                               t.Errorf("got job id %v, want %v", got, want)
+                       if gotID != tt.expectedID {
+                               t.Errorf("Expected job ID '%s', got '%s'", 
tt.expectedID, gotID)
                        }
                })
        }
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 20283740ca0..2b8b510ee9b 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -20,6 +20,7 @@ package main
 import (
        "context"
        "encoding/json"
+       "errors"
        "flag"
        "fmt"
        "log"
@@ -196,25 +197,22 @@ func main() {
        enableGoogleCloudProfiler := strings.Contains(options, 
enableGoogleCloudProfilerOption)
        enableGoogleCloudHeapSampling := strings.Contains(options, 
enableGoogleCloudHeapSamplingOption)
        if enableGoogleCloudProfiler {
-               if metadata := info.GetMetadata(); metadata != nil {
-                       if jobName, nameExists := metadata["job_name"]; 
nameExists {
-                               if jobId, idExists := metadata["job_id"]; 
idExists {
-                                       if enableGoogleCloudHeapSampling {
-                                               args = append(args, 
fmt.Sprintf(googleCloudProfilerAgentHeapArgs, jobName, jobId))
-                                       } else {
-                                               args = append(args, 
fmt.Sprintf(googleCloudProfilerAgentBaseArgs, jobName, jobId))
-                                       }
-                                       logger.Printf(ctx, "Turning on Cloud 
Profiling. Profile heap: %t", enableGoogleCloudHeapSampling)
-                               } else {
-                                       logger.Printf(ctx, "Required job_id 
missing from metadata, profiling will not be enabled without it.")
-                               }
-                       } else {
-                               logger.Printf(ctx, "Required job_name missing 
from metadata, profiling will not be enabled without it.")
-                       }
-               } else {
-                       logger.Printf(ctx, "enable_google_cloud_profiler is set 
to true, but no metadata is received from provision server, profiling will not 
be enabled.")
-               }
-       }
+               metadata := info.GetMetadata()
+           profilerServiceName := ExtractProfilerServiceName(options, metadata)
+
+           if profilerServiceName != "" {
+                  if jobId, idExists := metadata["job_id"]; idExists {
+                          if enableGoogleCloudHeapSampling {
+                                  args = append(args, 
fmt.Sprintf(googleCloudProfilerAgentHeapArgs, profilerServiceName, jobId))
+                          } else {
+                                  args = append(args, 
fmt.Sprintf(googleCloudProfilerAgentBaseArgs, profilerServiceName, jobId))
+                          }
+                          logger.Printf(ctx, "Turning on Cloud Profiling. 
Profile heap: %t, service: %s", enableGoogleCloudHeapSampling, 
profilerServiceName)
+                  } else {
+                          logger.Printf(ctx, "job_id is missing from metadata. 
Cannot enable profiling.")
+                  }
+           }
+    }
 
        disableJammAgent := strings.Contains(options, disableJammAgentOption)
        if disableJammAgent {
@@ -426,3 +424,55 @@ func BuildOptions(ctx context.Context, logger 
*tools.Logger, metaOptions []*Meta
        }
        return options
 }
+
+func ExtractProfilerServiceName(options string, metadata map[string]string) 
string {
+       const profilerKeyPrefix = "enable_google_cloud_profiler="
+
+       var profilerServiceName string
+
+       var parsed map[string]interface{}
+       if err := json.Unmarshal([]byte(options), &parsed); err != nil {
+               return ""
+       }
+
+       displayData, ok := parsed["display_data"].([]interface{})
+       if !ok {
+               return ""
+       }
+
+       for _, item := range displayData {
+               entry, ok := item.(map[string]interface{})
+               if !ok {
+                       continue
+               }
+               if entry["key"] == "dataflowServiceOptions" {
+                       rawValue, ok := entry["value"].(string)
+                       if !ok {
+                               continue
+                       }
+                       cleaned := strings.Trim(rawValue, "[]")
+                       opts := strings.Split(cleaned, ",")
+                       for _, opt := range opts {
+                               opt = strings.TrimSpace(opt)
+                               if strings.HasPrefix(opt, profilerKeyPrefix) {
+                                       parts := strings.SplitN(opt, "=", 2)
+                                       if len(parts) == 2 {
+                                               profilerServiceName = parts[1]
+                                               break
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // Fallback to job_name from metadata
+       if profilerServiceName == "" {
+               if jobName, exists := metadata["job_name"]; exists {
+                       profilerServiceName = jobName
+               }else {
+            return errors.New("required job_name missing from metadata, 
profiling will not be enabled without it").Error()
+        }
+       }
+
+       return profilerServiceName
+}
diff --git a/sdks/java/container/boot_test.go b/sdks/java/container/boot_test.go
index 61d67e93ecb..63564ad097f 100644
--- a/sdks/java/container/boot_test.go
+++ b/sdks/java/container/boot_test.go
@@ -90,3 +90,48 @@ func TestHeapSizeLimit(t *testing.T) {
                t.Errorf("HeapSizeLimit(200 GB). Actual (%d). want 168 GB", lim)
        }
 }
+
+func TestExtractProfilerServiceName(t *testing.T) {
+       tests := []struct {
+               name     string
+               options  string
+               metadata map[string]string
+               expected string
+       }{
+               {
+                       name: "Extracts custom profiler name from options",
+                       options: `{
+                               "display_data": [
+                                       {
+                                               "key": "dataflowServiceOptions",
+                                               "value": 
"[enable_google_cloud_profiler=custom_profiler, 
enable_google_cloud_heap_sampling]"
+                                       }
+                               ]
+                       }`,
+                       metadata: map[string]string{"job_name": 
"fallback_profiler"},
+                       expected: "custom_profiler",
+               },
+               {
+                       name: "Fallback to job_name when profiler not 
specified",
+                       options: `{
+                               "display_data": [
+                                       {
+                                               "key": "dataflowServiceOptions",
+                                               "value": 
"[enable_google_cloud_heap_sampling]"
+                                       }
+                               ]
+                       }`,
+                       metadata: map[string]string{"job_name": 
"fallback_profiler"},
+                       expected: "fallback_profiler",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := ExtractProfilerServiceName(tt.options, 
tt.metadata)
+                       if result != tt.expected {
+                               t.Errorf("Expected '%s', got '%s'", 
tt.expected, result)
+                       }
+               })
+       }
+}
\ No newline at end of file

Reply via email to