This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch lostluck-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git
commit b2b4f66e1c910fff579f9469c3cb8486d8ddf473 Author: Robert Burke <[email protected]> AuthorDate: Wed Apr 1 18:02:42 2020 -0700 [BEAM-9667] Allow metrics in DoFn Setup --- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index aba5d43..14f870f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -75,7 +75,11 @@ func (n *ParDo) Up(ctx context.Context) error { n.status = Up n.inv = newInvoker(n.Fn.ProcessElementFn()) - if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil { + // We can't cache the context during Setup since it runs only once per bundle. + // Subsequent bundles might run this same node, and the context here would be + // incorrectly refering to the older bundleId. + setupCtx := metrics.SetPTransformID(ctx, n.PID) + if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil { return n.fail(err) }
