lostluck commented on code in PR #32425: URL: https://github.com/apache/beam/pull/32425#discussion_r1764226783
########## sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go: ########## @@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) { + resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleRequest{ + InstructionId: b.InstID, + }, + }, + }) + if resp.GetError() != "" { + return nil, fmt.Errorf("finalize[%v] error from SDK: %v", b.InstID, resp.GetError()) + } + return resp.GetFinalizeBundle(), nil Review Comment: This is and will likely always be an empty message, and since this is internal code, I'd rather simply not return the empty message at this time, than speculate that we might want to return the message in the future if it gains fields. ########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -278,6 +280,14 @@ progress: slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput)) } em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals) + if s.finalize { + _, err := b.Finalize(ctx, wk) + if err != nil { + slog.Debug("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) + panic(err) Review Comment: At present the only reason the tests are failing is due to this panic. I'd make the slog output here be an error instead of debug, since it could be useful to users, and a finalizing failure isn't expected to fail the job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org