This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 687ed79f1f7 [#28187][prism] worker shutdown, cleanup, log fail, port
spec, grpc recv size (#28184)
687ed79f1f7 is described below
commit 687ed79f1f781e42730560e8544a48de5b096a11
Author: Robert Burke <[email protected]>
AuthorDate: Mon Aug 28 13:18:16 2023 -0700
[#28187][prism] worker shutdown, cleanup, log fail, port spec, grpc recv
size (#28184)
* [prism] worker shutdown, cleanup, log fail
* Increase prism server receive size to max.
---
sdks/go/pkg/beam/runners/prism/internal/execute.go | 10 ++++++++++
.../pkg/beam/runners/prism/internal/jobservices/job.go | 1 +
.../beam/runners/prism/internal/jobservices/server.go | 3 ++-
sdks/go/pkg/beam/runners/prism/internal/preprocess.go | 2 +-
.../pkg/beam/runners/prism/internal/worker/worker.go | 18 +++++++++++++++---
5 files changed, 29 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 49676710343..42327a0209d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -20,6 +20,7 @@ import (
"fmt"
"io"
"sort"
+ "time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
@@ -55,6 +56,15 @@ func RunPipeline(j *jobservices.Job) {
env, _ := getOnlyPair(envs)
wk := worker.New(env) // Cheating by having the worker id match the
environment id.
go wk.Serve()
+ timeout := time.Minute
+ time.AfterFunc(timeout, func() {
+ if wk.Connected() {
+ return
+ }
+ err := fmt.Errorf("prism %v didn't get control connection after
%v", wk, timeout)
+ j.Failed(err)
+ j.CancelFn(err)
+ })
// When this function exits, we cancel the context to clear
// any related job resources.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index ed0984323e2..fe4f18bd38e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -160,6 +160,7 @@ func (j *Job) Done() {
// Failed indicates that the job completed unsuccessfully.
func (j *Job) Failed(err error) {
+ slog.Error("job failed", slog.Any("job", j), slog.Any("error", err))
j.failureErr = err
j.sendState(jobpb.JobState_FAILED)
j.CancelFn(err)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
index aaff03047f6..e3fb7766b51 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -70,7 +70,8 @@ func (s *Server) getJob(id string) *Job {
}
func (s *Server) Endpoint() string {
- return s.lis.Addr().String()
+ _, port, _ := net.SplitHostPort(s.lis.Addr().String())
+ return fmt.Sprintf("localhost:%v", port)
}
// Serve serves on the started listener. Blocks.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
index 96c5f5549b0..bca40709626 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -139,7 +139,7 @@ func (p *preprocessor) preProcessGraph(comps
*pipepb.Components) []*stage {
keptLeaves := maps.Keys(leaves)
sort.Strings(keptLeaves)
topological := pipelinex.TopologicalSort(ts, keptLeaves)
- slog.Debug("topological transform ordering", topological)
+ slog.Debug("topological transform ordering", slog.Any("topological",
topological))
// Basic Fusion Behavior
//
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index dab831c20af..405c1e812a4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -22,6 +22,7 @@ import (
"context"
"fmt"
"io"
+ "math"
"net"
"strconv"
"strings"
@@ -62,6 +63,7 @@ type W struct {
// These are the ID sources
inst, bund uint64
+ connected atomic.Bool
InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements
@@ -83,7 +85,9 @@ func New(id string) *W {
if err != nil {
panic(fmt.Sprintf("failed to listen: %v", err))
}
- var opts []grpc.ServerOption
+ opts := []grpc.ServerOption{
+ grpc.MaxRecvMsgSize(math.MaxInt32),
+ }
wk := &W{
ID: id,
lis: lis,
@@ -106,7 +110,8 @@ func New(id string) *W {
}
func (wk *W) Endpoint() string {
- return wk.lis.Addr().String()
+ _, port, _ := net.SplitHostPort(wk.lis.Addr().String())
+ return fmt.Sprintf("localhost:%v", port)
}
// Serve serves on the started listener. Blocks.
@@ -200,7 +205,7 @@ func (wk *W) Logging(stream
fnpb.BeamFnLogging_LoggingServer) error {
file = file[:i]
}
- slog.LogAttrs(context.TODO(),
toSlogSev(l.GetSeverity()), l.GetMessage(),
+ slog.LogAttrs(stream.Context(),
toSlogSev(l.GetSeverity()), l.GetMessage(),
slog.Any(slog.SourceKey, &slog.Source{
File: file,
Line: line,
@@ -241,10 +246,15 @@ func (wk *W) GetProcessBundleDescriptor(ctx
context.Context, req *fnpb.GetProces
return desc, nil
}
+func (wk *W) Connected() bool {
+ return wk.connected.Load()
+}
+
// Control relays instructions to SDKs and back again, coordinated via unique
instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+ wk.connected.Store(true)
done := make(chan struct{})
go func() {
for {
@@ -281,10 +291,12 @@ func (wk *W) Control(ctrl
fnpb.BeamFnControl_ControlServer) error {
case req := <-wk.InstReqs:
err := ctrl.Send(req)
if err != nil {
+ go func() { <-done }()
return err
}
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
+ go func() { <-done }()
return ctrl.Context().Err()
case <-done:
slog.Debug("Control done")