This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 25be5af7ccf [GoSDK + Prism] Support Process env execution. (#33651)
25be5af7ccf is described below

commit 25be5af7ccf2824eb97ac88fbb3ff140d3dff8fd
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jan 17 19:28:11 2025 -0800

    [GoSDK + Prism] Support Process env execution. (#33651)
---
 CHANGES.md                                         |  2 ++
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  9 +++++--
 .../pkg/beam/core/runtime/graphx/translate_test.go | 31 +++++++++++++++-------
 .../beam/runners/prism/internal/environments.go    | 31 ++++++++++++++++++++++
 4 files changed, 62 insertions(+), 11 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 660fa1c223f..a556e0964a2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -74,12 +74,14 @@
 * External, Process based Worker Pool support added to the Go SDK container. 
([#33572](https://github.com/apache/beam/pull/33572))
   * This is used to enable sidecar containers to run SDK workers for some 
runners.
   * See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for 
details.
+* Support the Process Environment for execution in the Go SDK. 
([#33651](https://github.com/apache/beam/pull/33651))
 * Prism
   * Prism now uses the same single port for both pipeline submission and 
execution on workers. Requests are differentiated by worker-id. 
([#33438](https://github.com/apache/beam/pull/33438))
     * This avoids port starvation and provides clarity on port use when 
running Prism in non-local environments.
   * Support for @RequiresTimeSortedInputs added. 
([#33513](https://github.com/apache/beam/issues/33513))
   * Initial support for AllowedLateness added. 
([#33542](https://github.com/apache/beam/pull/33542))
   * The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now 
supports non-loopback mode environment types. 
([#33572](https://github.com/apache/beam/pull/33572))
+  * Support the Process Environment for execution in Prism 
([#33651](https://github.com/apache/beam/pull/33651))
 
 ## Breaking Changes
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 1e30d425850..e7a3da7ff10 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -17,6 +17,7 @@ package graphx
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "sort"
        "strings"
@@ -122,8 +123,12 @@ func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig
        var serializedPayload []byte
        switch urn {
        case URNEnvProcess:
-               // TODO Support process based SDK Harness.
-               return nil, errors.Errorf("unsupported environment %v", urn)
+               config := extractEnvironmentConfig(ctx)
+               payload := &pipepb.ProcessPayload{}
+               if err := json.Unmarshal([]byte(config), payload); err != nil {
+                       return nil, fmt.Errorf("unable to json unmarshal 
--environment_config: %w", err)
+               }
+               serializedPayload = protox.MustEncode(payload)
        case URNEnvExternal:
                config := extractEnvironmentConfig(ctx)
                payload := &pipepb.ExternalPayload{Endpoint: 
&pipepb.ApiServiceDescriptor{Url: config}}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index e18a5f97796..c984f7ee08b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -296,22 +296,22 @@ func (fn *splitPickFn) ProcessElement(_ *testRT, a int, 
small, big func(int)) {
 }
 
 func TestCreateEnvironment(t *testing.T) {
-       t.Run("process", func(t *testing.T) {
-               const wantEnv = "process"
+       t.Run("processBadConfig", func(t *testing.T) {
                urn := graphx.URNEnvProcess
-               got, err := graphx.CreateEnvironment(context.Background(), urn, 
func(_ context.Context) string { return wantEnv })
+               got, err := graphx.CreateEnvironment(context.Background(), urn, 
func(_ context.Context) string { return "not a real json" })
                if err == nil {
-                       t.Errorf("CreateEnvironment(%v) = %v error, want error 
since it's unsupported", urn, err)
+                       t.Errorf("CreateEnvironment(%v) = %v error, want error 
since parsing should fail", urn, err)
                }
                want := (*pipepb.Environment)(nil)
                if !proto.Equal(got, want) {
-                       t.Errorf("CreateEnvironment(%v) = %v, want %v since 
it's unsupported", urn, got, want)
+                       t.Errorf("CreateEnvironment(%v) = %v, want %v since 
creation should have failed", urn, got, want)
                }
        })
        tests := []struct {
-               name    string
-               urn     string
-               payload func(name string) []byte
+               name           string
+               configOverride string
+               urn            string
+               payload        func(name string) []byte
        }{
                {
                        name: "external",
@@ -331,12 +331,25 @@ func TestCreateEnvironment(t *testing.T) {
                                        ContainerImage: name,
                                })
                        },
+               }, {
+                       name:           "process",
+                       configOverride: "{ \"command\": \"process\" }",
+                       urn:            graphx.URNEnvProcess,
+                       payload: func(name string) []byte {
+                               return protox.MustEncode(&pipepb.ProcessPayload{
+                                       Command: name,
+                               })
+                       },
                },
        }
        for _, test := range tests {
                test := test
                t.Run(test.name, func(t *testing.T) {
-                       got, err := 
graphx.CreateEnvironment(context.Background(), test.urn, func(_ 
context.Context) string { return test.name })
+                       config := test.name
+                       if test.configOverride != "" {
+                               config = test.configOverride
+                       }
+                       got, err := 
graphx.CreateEnvironment(context.Background(), test.urn, func(_ 
context.Context) string { return config })
                        if err != nil {
                                t.Errorf("CreateEnvironment(%v) = %v error, 
want nil", test.urn, err)
                        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go 
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 25f533b50b9..89a2edabdbb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -22,6 +22,8 @@ import (
        "io"
        "log/slog"
        "os"
+       "os/exec"
+       "time"
 
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -66,6 +68,16 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, 
env string, wk *wor
                        logger.Error("unmarshing docker environment payload", 
"error", err)
                }
                return dockerEnvironment(ctx, logger, dp, wk, 
j.ArtifactEndpoint())
+       case urns.EnvProcess:
+               pp := &pipepb.ProcessPayload{}
+               if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), 
pp); err != nil {
+                       logger.Error("unmarshing docker environment payload", 
"error", err)
+               }
+               go func() {
+                       processEnvironment(ctx, pp, wk)
+                       logger.Debug("environment stopped", slog.String("job", 
j.String()))
+               }()
+               return nil
        default:
                return fmt.Errorf("environment %v with urn %v unimplemented", 
env, e.GetUrn())
        }
@@ -231,3 +243,22 @@ func dockerEnvironment(ctx context.Context, logger 
*slog.Logger, dp *pipepb.Dock
 
        return nil
 }
+
+func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk 
*worker.W) {
+       cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, 
"--provision_endpoint="+wk.Endpoint())
+
+       cmd.WaitDelay = time.Millisecond * 100
+       cmd.Stderr = os.Stderr
+       cmd.Stdout = os.Stdout
+       cmd.Env = os.Environ()
+
+       for k, v := range pp.GetEnv() {
+               cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
+       }
+       if err := cmd.Start(); err != nil {
+               return
+       }
+       // Job processing happens here, but orchestrated by other goroutines
+       // This call blocks until the context is cancelled, or the command 
exits.
+       cmd.Wait()
+}

Reply via email to