lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1764226783
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}
+func (b *B) Finalize(ctx context.Context, wk *W)
(*fnpb.FinalizeBundleResponse, error) {
+ resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
+ Request: &fnpb.InstructionRequest_FinalizeBundle{
+ FinalizeBundle: &fnpb.FinalizeBundleRequest{
+ InstructionId: b.InstID,
+ },
+ },
+ })
+ if resp.GetError() != "" {
+ return nil, fmt.Errorf("finalize[%v] error from SDK: %v",
b.InstID, resp.GetError())
+ }
+ return resp.GetFinalizeBundle(), nil
Review Comment:
This is and will likely always be an empty message, and since this is
internal code, I'd rather simply not return the empty message at this time,
than speculate that we might want to return the message in the future if it
gains fields.
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -278,6 +280,14 @@ progress:
slog.Debug("returned empty residual application", "bundle", rb,
slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo,
residuals)
+ if s.finalize {
+ _, err := b.Finalize(ctx, wk)
+ if err != nil {
+ slog.Debug("SDK Error from bundle finalization",
"bundle", rb, "error", err.Error())
+ panic(err)
Review Comment:
At present the only reason the tests are failing is due to this panic.
I'd make the slog output here be an error instead of debug, since it could
be useful to users, and a finalizing failure isn't expected to fail the job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]