This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 25be5af7ccf [GoSDK + Prism] Support Process env execution. (#33651)
25be5af7ccf is described below
commit 25be5af7ccf2824eb97ac88fbb3ff140d3dff8fd
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jan 17 19:28:11 2025 -0800
[GoSDK + Prism] Support Process env execution. (#33651)
---
CHANGES.md | 2 ++
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 9 +++++--
.../pkg/beam/core/runtime/graphx/translate_test.go | 31 +++++++++++++++-------
.../beam/runners/prism/internal/environments.go | 31 ++++++++++++++++++++++
4 files changed, 62 insertions(+), 11 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 660fa1c223f..a556e0964a2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -74,12 +74,14 @@
* External, Process based Worker Pool support added to the Go SDK container.
([#33572](https://github.com/apache/beam/pull/33572))
* This is used to enable sidecar containers to run SDK workers for some
runners.
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for
details.
+* Support the Process Environment for execution in the Go SDK.
([#33651](https://github.com/apache/beam/pull/33651))
* Prism
* Prism now uses the same single port for both pipeline submission and
execution on workers. Requests are differentiated by worker-id.
([#33438](https://github.com/apache/beam/pull/33438))
* This avoids port starvation and provides clarity on port use when
running Prism in non-local environments.
* Support for @RequiresTimeSortedInputs added.
([#33513](https://github.com/apache/beam/issues/33513))
* Initial support for AllowedLateness added.
([#33542](https://github.com/apache/beam/pull/33542))
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now
supports non-loopback mode environment types.
([#33572](https://github.com/apache/beam/pull/33572))
+ * Support the Process Environment for execution in Prism
([#33651](https://github.com/apache/beam/pull/33651))
## Breaking Changes
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 1e30d425850..e7a3da7ff10 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -17,6 +17,7 @@ package graphx
import (
"context"
+ "encoding/json"
"fmt"
"sort"
"strings"
@@ -122,8 +123,12 @@ func CreateEnvironment(ctx context.Context, urn string,
extractEnvironmentConfig
var serializedPayload []byte
switch urn {
case URNEnvProcess:
- // TODO Support process based SDK Harness.
- return nil, errors.Errorf("unsupported environment %v", urn)
+ config := extractEnvironmentConfig(ctx)
+ payload := &pipepb.ProcessPayload{}
+ if err := json.Unmarshal([]byte(config), payload); err != nil {
+ return nil, fmt.Errorf("unable to json unmarshal
--environment_config: %w", err)
+ }
+ serializedPayload = protox.MustEncode(payload)
case URNEnvExternal:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint:
&pipepb.ApiServiceDescriptor{Url: config}}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index e18a5f97796..c984f7ee08b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -296,22 +296,22 @@ func (fn *splitPickFn) ProcessElement(_ *testRT, a int,
small, big func(int)) {
}
func TestCreateEnvironment(t *testing.T) {
- t.Run("process", func(t *testing.T) {
- const wantEnv = "process"
+ t.Run("processBadConfig", func(t *testing.T) {
urn := graphx.URNEnvProcess
- got, err := graphx.CreateEnvironment(context.Background(), urn,
func(_ context.Context) string { return wantEnv })
+ got, err := graphx.CreateEnvironment(context.Background(), urn,
func(_ context.Context) string { return "not a real json" })
if err == nil {
- t.Errorf("CreateEnvironment(%v) = %v error, want error
since it's unsupported", urn, err)
+ t.Errorf("CreateEnvironment(%v) = %v error, want error
since parsing should fail", urn, err)
}
want := (*pipepb.Environment)(nil)
if !proto.Equal(got, want) {
- t.Errorf("CreateEnvironment(%v) = %v, want %v since
it's unsupported", urn, got, want)
+ t.Errorf("CreateEnvironment(%v) = %v, want %v since
creation should have failed", urn, got, want)
}
})
tests := []struct {
- name string
- urn string
- payload func(name string) []byte
+ name string
+ configOverride string
+ urn string
+ payload func(name string) []byte
}{
{
name: "external",
@@ -331,12 +331,25 @@ func TestCreateEnvironment(t *testing.T) {
ContainerImage: name,
})
},
+ }, {
+ name: "process",
+ configOverride: "{ \"command\": \"process\" }",
+ urn: graphx.URNEnvProcess,
+ payload: func(name string) []byte {
+ return protox.MustEncode(&pipepb.ProcessPayload{
+ Command: name,
+ })
+ },
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
- got, err :=
graphx.CreateEnvironment(context.Background(), test.urn, func(_
context.Context) string { return test.name })
+ config := test.name
+ if test.configOverride != "" {
+ config = test.configOverride
+ }
+ got, err :=
graphx.CreateEnvironment(context.Background(), test.urn, func(_
context.Context) string { return config })
if err != nil {
t.Errorf("CreateEnvironment(%v) = %v error,
want nil", test.urn, err)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 25f533b50b9..89a2edabdbb 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -22,6 +22,8 @@ import (
"io"
"log/slog"
"os"
+ "os/exec"
+ "time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -66,6 +68,16 @@ func runEnvironment(ctx context.Context, j *jobservices.Job,
env string, wk *wor
logger.Error("unmarshing docker environment payload",
"error", err)
}
return dockerEnvironment(ctx, logger, dp, wk,
j.ArtifactEndpoint())
+ case urns.EnvProcess:
+ pp := &pipepb.ProcessPayload{}
+ if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(),
pp); err != nil {
+ logger.Error("unmarshing docker environment payload",
"error", err)
+ }
+ go func() {
+ processEnvironment(ctx, pp, wk)
+ logger.Debug("environment stopped", slog.String("job",
j.String()))
+ }()
+ return nil
default:
return fmt.Errorf("environment %v with urn %v unimplemented",
env, e.GetUrn())
}
@@ -231,3 +243,22 @@ func dockerEnvironment(ctx context.Context, logger
*slog.Logger, dp *pipepb.Dock
return nil
}
+
+func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk
*worker.W) {
+ cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID,
"--provision_endpoint="+wk.Endpoint())
+
+ cmd.WaitDelay = time.Millisecond * 100
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+ cmd.Env = os.Environ()
+
+ for k, v := range pp.GetEnv() {
+ cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
+ }
+ if err := cmd.Start(); err != nil {
+ return
+ }
+ // Job processing happens here, but orchestrated by other goroutines
+ // This call blocks until the context is cancelled, or the command
exits.
+ cmd.Wait()
+}