This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3463aa3fde5 [prism] Fail jobs on SDK disconnect. (#28193)
3463aa3fde5 is described below
commit 3463aa3fde5d1c6a3f735ed0951046829142eb41
Author: Robert Burke <[email protected]>
AuthorDate: Fri Sep 1 13:39:48 2023 -0700
[prism] Fail jobs on SDK disconnect. (#28193)
* [prism] Fail jobs on SDK disconnect.
* Reduce flaky short bame for passert test.
* [prism] better workerID, warn on pre-bundle fail, buffer done chan
* Add causes, extract bundle failures to RunPipeline
* Return bundle errors through execPipeline.
---------
Co-authored-by: lostluck <[email protected]>
---
.../prism/internal/engine/elementmanager.go | 17 ++++-
sdks/go/pkg/beam/runners/prism/internal/execute.go | 43 +++++++----
.../beam/runners/prism/internal/jobservices/job.go | 12 ++-
.../prism/internal/jobservices/management.go | 2 +-
sdks/go/pkg/beam/runners/prism/internal/stage.go | 31 +++-----
.../beam/runners/prism/internal/worker/bundle.go | 24 +++---
.../runners/prism/internal/worker/bundle_test.go | 2 +-
.../beam/runners/prism/internal/worker/worker.go | 85 +++++++++++++++-------
.../runners/prism/internal/worker/worker_test.go | 10 +--
sdks/go/pkg/beam/testing/passert/equals_test.go | 2 +-
10 files changed, 142 insertions(+), 86 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index fb9c9802502..df53bce8ac5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -209,11 +209,11 @@ func (rb RunBundle) LogValue() slog.Value {
// remaining.
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func()
string) <-chan RunBundle {
runStageCh := make(chan RunBundle)
- ctx, cancelFn := context.WithCancel(ctx)
+ ctx, cancelFn := context.WithCancelCause(ctx)
go func() {
em.pendingElements.Wait()
- slog.Info("no more pending elements: terminating pipeline")
- cancelFn()
+ slog.Debug("no more pending elements: terminating pipeline")
+ cancelFn(fmt.Errorf("elementManager out of elements, cleaning
up"))
// Ensure the watermark evaluation goroutine exits.
em.refreshCond.Broadcast()
}()
@@ -394,6 +394,17 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
em.addRefreshAndClearBundle(stage.ID, rb.BundleID)
}
+// FailBundle clears the extant data allowing the execution to shut down.
+func (em *ElementManager) FailBundle(rb RunBundle) {
+ stage := em.stages[rb.StageID]
+ stage.mu.Lock()
+ completed := stage.inprogress[rb.BundleID]
+ em.pendingElements.Add(-len(completed.es))
+ delete(stage.inprogress, rb.BundleID)
+ stage.mu.Unlock()
+ em.addRefreshAndClearBundle(rb.StageID, rb.BundleID)
+}
+
// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int,
inputInfo PColInfo, residuals [][]byte) {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 42327a0209d..b2f9d866603 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -54,7 +54,7 @@ func RunPipeline(j *jobservices.Job) {
return
}
env, _ := getOnlyPair(envs)
- wk := worker.New(env) // Cheating by having the worker id match the
environment id.
+ wk := worker.New(j.String()+"_"+env, env) // Cheating by having the
worker id match the environment id.
go wk.Serve()
timeout := time.Minute
time.AfterFunc(timeout, func() {
@@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) {
// When this function exits, we cancel the context to clear
// any related job resources.
defer func() {
- j.CancelFn(nil)
+ j.CancelFn(fmt.Errorf("runPipeline returned, cleaning up"))
}()
go runEnvironment(j.RootCtx, j, env, wk)
@@ -102,10 +102,10 @@ func runEnvironment(ctx context.Context, j
*jobservices.Job, env string, wk *wor
case urns.EnvExternal:
ep := &pipepb.ExternalPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(),
ep); err != nil {
- slog.Error("unmarshing environment payload", err,
slog.String("envID", wk.ID))
+ slog.Error("unmarshing environment payload", err,
slog.String("envID", wk.Env))
}
externalEnvironment(ctx, ep, wk)
- slog.Info("environment stopped", slog.String("envID",
wk.String()), slog.String("job", j.String()))
+ slog.Debug("environment stopped", slog.String("envID",
wk.String()), slog.String("job", j.String()))
default:
panic(fmt.Sprintf("environment %v with urn %v unimplemented",
env, e.GetUrn()))
}
@@ -271,7 +271,7 @@ func executePipeline(ctx context.Context, wk *worker.W, j
*jobservices.Job) erro
}
stages[stage.ID] = stage
wk.Descriptors[stage.ID] = stage.desc
- case wk.ID:
+ case wk.Env:
// Great! this is for this environment. // Broken
abstraction.
if err := buildDescriptor(stage, comps, wk); err != nil
{
return fmt.Errorf("prism error building stage
%v: \n%w", stage.ID, err)
@@ -296,16 +296,31 @@ func executePipeline(ctx context.Context, wk *worker.W, j
*jobservices.Job) erro
// Use a channel to limit max parallelism for the pipeline.
maxParallelism := make(chan struct{}, 8)
// Execute stages here
- for rb := range em.Bundles(ctx, wk.NextInst) {
- maxParallelism <- struct{}{}
- go func(rb engine.RunBundle) {
- defer func() { <-maxParallelism }()
- s := stages[rb.StageID]
- s.Execute(ctx, j, wk, comps, em, rb)
- }(rb)
+ bundleFailed := make(chan error)
+ bundles := em.Bundles(ctx, wk.NextInst)
+ for {
+ select {
+ case <-ctx.Done():
+ return context.Cause(ctx)
+ case rb, ok := <-bundles:
+ if !ok {
+ slog.Debug("pipeline done!", slog.String("job",
j.String()))
+ return nil
+ }
+ maxParallelism <- struct{}{}
+ go func(rb engine.RunBundle) {
+ defer func() { <-maxParallelism }()
+ s := stages[rb.StageID]
+ if err := s.Execute(ctx, j, wk, comps, em, rb);
err != nil {
+ // Ensure we clean up on bundle failure
+ em.FailBundle(rb)
+ bundleFailed <- err
+ }
+ }(rb)
+ case err := <-bundleFailed:
+ return err
+ }
}
- slog.Info("pipeline done!", slog.String("job", j.String()))
- return nil
}
func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder,
comps *pipepb.Components) func(io.Reader) []byte {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index fe4f18bd38e..10d36066391 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -137,9 +137,13 @@ func (j *Job) SendMsg(msg string) {
func (j *Job) sendState(state jobpb.JobState_Enum) {
j.streamCond.L.Lock()
defer j.streamCond.L.Unlock()
- j.stateTime = time.Now()
- j.stateIdx++
- j.state.Store(state)
+ old := j.state.Load()
+ // Never overwrite a failed state with another one.
+ if old != jobpb.JobState_FAILED {
+ j.state.Store(state)
+ j.stateTime = time.Now()
+ j.stateIdx++
+ }
j.streamCond.Broadcast()
}
@@ -163,5 +167,5 @@ func (j *Job) Failed(err error) {
slog.Error("job failed", slog.Any("job", j), slog.Any("error", err))
j.failureErr = err
j.sendState(jobpb.JobState_FAILED)
- j.CancelFn(err)
+ j.CancelFn(fmt.Errorf("jobFailed %v: %w", j, err))
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index d347e88ec60..e626a05b51e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -226,7 +226,7 @@ func (s *Server) GetMessageStream(req
*jobpb.JobMessagesRequest, stream jobpb.Jo
job.streamCond.Wait()
select { // Quit out if the external connection is done.
case <-stream.Context().Done():
- return stream.Context().Err()
+ return context.Cause(stream.Context())
default:
}
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 3f4451d7db3..4d8d4621168 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -75,12 +75,7 @@ type stage struct {
OutputsToCoders map[string]engine.PColInfo
}
-func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W,
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) {
- select {
- case <-ctx.Done():
- return
- default:
- }
+func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W,
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error
{
slog.Debug("Execute: starting bundle", "bundle", rb)
var b *worker.B
@@ -103,7 +98,7 @@ func (s *stage) Execute(ctx context.Context, j
*jobservices.Job, wk *worker.W, c
closed := make(chan struct{})
close(closed)
dataReady = closed
- case wk.ID:
+ case wk.Env:
b = &worker.B{
PBDID: s.ID,
InstID: rb.BundleID,
@@ -122,15 +117,10 @@ func (s *stage) Execute(ctx context.Context, j
*jobservices.Job, wk *worker.W, c
slog.Debug("Execute: processing", "bundle", rb)
defer b.Cleanup(wk)
- b.Fail = func(errMsg string) {
- slog.Error("job failed", "bundle", rb, "job", j)
- err := fmt.Errorf("%v", errMsg)
- j.Failed(err)
- }
dataReady = b.ProcessOn(ctx, wk)
default:
err := fmt.Errorf("unknown environment[%v]", s.envID)
- slog.Error("Execute", err)
+ slog.Error("Execute", "error", err)
panic(err)
}
@@ -145,20 +135,20 @@ progress:
progTick.Stop()
break progress // exit progress loop on close.
case <-progTick.C:
- resp, err := b.Progress(wk)
+ resp, err := b.Progress(ctx, wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting
progress", "bundle", rb, "error", err.Error())
break progress
}
index, unknownIDs := j.ContributeTentativeMetrics(resp)
if len(unknownIDs) > 0 {
- md := wk.MonitoringMetadata(unknownIDs)
+ md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index",
index)
// Progress for the bundle hasn't advanced. Try
splitting.
if previousIndex == index && !splitsDone {
- sr, err := b.Split(wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
+ sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {
slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
break progress
@@ -200,16 +190,18 @@ progress:
var resp *fnpb.ProcessBundleResponse
select {
case resp = <-b.Resp:
+ if b.BundleErr != nil {
+ return b.BundleErr
+ }
case <-ctx.Done():
- // Ensures we clean up on failure, if the response is blocked.
- return
+ return context.Cause(ctx)
}
// Tally metrics immeadiately so they're available before
// pipeline termination.
unknownIDs := j.ContributeFinalMetrics(resp)
if len(unknownIDs) > 0 {
- md := wk.MonitoringMetadata(unknownIDs)
+ md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
// TODO handle side input data properly.
@@ -239,6 +231,7 @@ progress:
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo,
residualData, minOutputWatermark)
b.OutputData = engine.TentativeData{} // Clear the data.
+ return nil
}
func getSideInputs(t *pipepb.PTransform) (map[string]*pipepb.SideInput, error)
{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index d17deedec8d..98479e3db07 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -52,14 +52,11 @@ type B struct {
dataSema atomic.Int32
OutputData engine.TentativeData
- // TODO move response channel to an atomic and an additional
- // block on the DataWait channel, to allow progress & splits for
- // no output DoFns.
- Resp chan *fnpb.ProcessBundleResponse
+ Resp chan *fnpb.ProcessBundleResponse
+ BundleErr error
+ responded bool
SinkToPCollection map[string]string
-
- Fail func(err string) // Called if bundle returns an error.
}
// Init initializes the bundle's internal state for waiting on all
@@ -90,8 +87,13 @@ func (b *B) LogValue() slog.Value {
}
func (b *B) Respond(resp *fnpb.InstructionResponse) {
+ if b.responded {
+ slog.Warn("additional bundle response", "bundle", b, "resp",
resp)
+ return
+ }
+ b.responded = true
if resp.GetError() != "" {
- b.Fail(resp.GetError())
+ b.BundleErr = fmt.Errorf("bundle %v failed:%v",
resp.GetInstructionId(), resp.GetError())
close(b.Resp)
return
}
@@ -152,8 +154,8 @@ func (b *B) Cleanup(wk *W) {
}
// Progress sends a progress request for the given bundle to the passed in
worker, blocking on the response.
-func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
- resp := wk.sendInstruction(&fnpb.InstructionRequest{
+func (b *B) Progress(ctx context.Context, wk *W)
(*fnpb.ProcessBundleProgressResponse, error) {
+ resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
ProcessBundleProgress:
&fnpb.ProcessBundleProgressRequest{
InstructionId: b.InstID,
@@ -167,8 +169,8 @@ func (b *B) Progress(wk *W)
(*fnpb.ProcessBundleProgressResponse, error) {
}
// Split sends a split request for the given bundle to the passed in worker,
blocking on the response.
-func (b *B) Split(wk *W, fraction float64, allowedSplits []int64)
(*fnpb.ProcessBundleSplitResponse, error) {
- resp := wk.sendInstruction(&fnpb.InstructionRequest{
+func (b *B) Split(ctx context.Context, wk *W, fraction float64, allowedSplits
[]int64) (*fnpb.ProcessBundleSplitResponse, error) {
+ resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{
InstructionId: b.InstID,
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
index c747711f8f0..ba5b10f5fd3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
@@ -23,7 +23,7 @@ import (
)
func TestBundle_ProcessOn(t *testing.T) {
- wk := New("test")
+ wk := New("test", "testEnv")
b := &B{
InstID: "testInst",
PBDID: "testPBDID",
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 405c1e812a4..0ad7ccb3703 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -55,15 +55,15 @@ type W struct {
fnpb.UnimplementedBeamFnLoggingServer
fnpb.UnimplementedProvisionServiceServer
- ID string
+ ID, Env string
// Server management
lis net.Listener
server *grpc.Server
// These are the ID sources
- inst, bund uint64
- connected atomic.Bool
+ inst, bund uint64
+ connected, stopped atomic.Bool
InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements
@@ -80,7 +80,7 @@ type controlResponder interface {
}
// New starts the worker server components of FnAPI Execution.
-func New(id string) *W {
+func New(id, env string) *W {
lis, err := net.Listen("tcp", ":0")
if err != nil {
panic(fmt.Sprintf("failed to listen: %v", err))
@@ -90,6 +90,7 @@ func New(id string) *W {
}
wk := &W{
ID: id,
+ Env: env,
lis: lis,
server: grpc.NewServer(opts...),
@@ -133,6 +134,7 @@ func (wk *W) LogValue() slog.Value {
// Stop the GRPC server.
func (wk *W) Stop() {
slog.Debug("stopping", "worker", wk)
+ wk.stopped.Store(true)
close(wk.InstReqs)
close(wk.DataReqs)
wk.server.Stop()
@@ -246,6 +248,7 @@ func (wk *W) GetProcessBundleDescriptor(ctx
context.Context, req *fnpb.GetProces
return desc, nil
}
+// Connected indicates whether the worker has connected to the control RPC.
func (wk *W) Connected() bool {
return wk.connected.Load()
}
@@ -255,27 +258,26 @@ func (wk *W) Connected() bool {
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
- done := make(chan struct{})
+ done := make(chan error, 1)
go func() {
for {
resp, err := ctrl.Recv()
if err == io.EOF {
slog.Debug("ctrl.Recv finished; marking done",
"worker", wk)
- done <- struct{}{} // means stream is finished
+ done <- nil // means stream is finished
return
}
if err != nil {
switch status.Code(err) {
case codes.Canceled:
- done <- struct{}{} // means stream is
finished
+ done <- err // means stream is finished
return
default:
- slog.Error("ctrl.Recv failed", err,
"worker", wk)
+ slog.Error("ctrl.Recv failed", "error",
err, "worker", wk)
panic(err)
}
}
- // TODO: Do more than assume these are
ProcessBundleResponses.
wk.mu.Lock()
if b, ok :=
wk.activeInstructions[resp.GetInstructionId()]; ok {
b.Respond(resp)
@@ -288,19 +290,33 @@ func (wk *W) Control(ctrl
fnpb.BeamFnControl_ControlServer) error {
for {
select {
- case req := <-wk.InstReqs:
- err := ctrl.Send(req)
- if err != nil {
- go func() { <-done }()
+ case req, ok := <-wk.InstReqs:
+ if !ok {
+ slog.Debug("Worker shutting down.", "worker",
wk)
+ return nil
+ }
+ if err := ctrl.Send(req); err != nil {
return err
}
case <-ctrl.Context().Done():
- slog.Debug("Control context canceled")
- go func() { <-done }()
- return ctrl.Context().Err()
- case <-done:
- slog.Debug("Control done")
- return nil
+ wk.mu.Lock()
+ // Fail extant instructions
+ slog.Debug("SDK Disconnected", "worker", wk,
"ctx_error", ctrl.Context().Err(), "outstanding_instructions",
len(wk.activeInstructions))
+ for instID, b := range wk.activeInstructions {
+ b.Respond(&fnpb.InstructionResponse{
+ InstructionId: instID,
+ Error: "SDK Disconnected",
+ })
+ }
+ wk.mu.Unlock()
+ return context.Cause(ctrl.Context())
+ case err := <-done:
+ if err != nil {
+ slog.Warn("Control done", "error", err,
"worker", wk)
+ } else {
+ slog.Debug("Control done", "worker", wk)
+ }
+ return err
}
}
}
@@ -359,7 +375,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
}
case <-data.Context().Done():
slog.Debug("Data context canceled")
- return data.Context().Err()
+ return context.Cause(data.Context())
}
}
}
@@ -394,8 +410,12 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer)
error {
// State requests are always for an active
ProcessBundle instruction
wk.mu.Lock()
- b :=
wk.activeInstructions[req.GetInstructionId()].(*B)
+ b, ok :=
wk.activeInstructions[req.GetInstructionId()].(*B)
wk.mu.Unlock()
+ if !ok {
+ slog.Warn("state request after bundle
inactive", "instruction", req.GetInstructionId(), "worker", wk)
+ continue
+ }
key := req.GetStateKey()
slog.Debug("StateRequest_Get",
prototext.Format(req), "bundle", b)
@@ -490,7 +510,7 @@ func (cr *chanResponder) Respond(resp
*fnpb.InstructionResponse) {
// sendInstruction is a helper for creating and sending worker single RPCs,
blocking
// until the response returns.
-func (wk *W) sendInstruction(req *fnpb.InstructionRequest)
*fnpb.InstructionResponse {
+func (wk *W) sendInstruction(ctx context.Context, req
*fnpb.InstructionRequest) *fnpb.InstructionResponse {
cr := chanResponderPool.Get().(*chanResponder)
progInst := wk.NextInst()
wk.mu.Lock()
@@ -506,15 +526,26 @@ func (wk *W) sendInstruction(req
*fnpb.InstructionRequest) *fnpb.InstructionResp
req.InstructionId = progInst
- // Tell the SDK to start processing the bundle.
+ if wk.stopped.Load() {
+ return nil
+ }
wk.InstReqs <- req
- // Protos are safe as nil, so just return directly.
- return <-cr.Resp
+
+ select {
+ case <-ctx.Done():
+ return &fnpb.InstructionResponse{
+ InstructionId: progInst,
+ Error: "context canceled before receive",
+ }
+ case resp := <-cr.Resp:
+ // Protos are safe as nil, so just return directly.
+ return resp
+ }
}
// MonitoringMetadata is a convenience method to request the metadata for
monitoring shortIDs.
-func (wk *W) MonitoringMetadata(unknownIDs []string)
*fnpb.MonitoringInfosMetadataResponse {
- return wk.sendInstruction(&fnpb.InstructionRequest{
+func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string)
*fnpb.MonitoringInfosMetadataResponse {
+ return wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_MonitoringInfos{
MonitoringInfos: &fnpb.MonitoringInfosMetadataRequest{
MonitoringInfoId: unknownIDs,
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 060c073fa12..ed61f484481 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -32,14 +32,14 @@ import (
)
func TestWorker_New(t *testing.T) {
- w := New("test")
+ w := New("test", "testEnv")
if got, want := w.ID, "test"; got != want {
t.Errorf("New(%q) = %v, want %v", want, got, want)
}
}
func TestWorker_NextInst(t *testing.T) {
- w := New("test")
+ w := New("test", "testEnv")
instIDs := map[string]struct{}{}
for i := 0; i < 100; i++ {
@@ -51,7 +51,7 @@ func TestWorker_NextInst(t *testing.T) {
}
func TestWorker_NextStage(t *testing.T) {
- w := New("test")
+ w := New("test", "testEnv")
stageIDs := map[string]struct{}{}
for i := 0; i < 100; i++ {
@@ -63,7 +63,7 @@ func TestWorker_NextStage(t *testing.T) {
}
func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
- w := New("test")
+ w := New("test", "testEnv")
id := "available"
w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
@@ -93,7 +93,7 @@ func serveTestWorker(t *testing.T) (context.Context, *W,
*grpc.ClientConn) {
ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
- w := New("test")
+ w := New("test", "testEnv")
lis := bufconn.Listen(2048)
w.lis = lis
t.Cleanup(func() { w.Stop() })
diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go
b/sdks/go/pkg/beam/testing/passert/equals_test.go
index a8a5c835f8f..6e6578dd3e4 100644
--- a/sdks/go/pkg/beam/testing/passert/equals_test.go
+++ b/sdks/go/pkg/beam/testing/passert/equals_test.go
@@ -184,7 +184,7 @@ func ExampleEqualsList_mismatch() {
err = unwrapError(err)
// Process error for cleaner example output, demonstrating the diff.
- processedErr := strings.SplitAfter(err.Error(),
"/passert.failIfBadEntries] failed:")
+ processedErr := strings.SplitAfter(err.Error(), ".failIfBadEntries]
failed:")
fmt.Println(processedErr[1])
// Output: