This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b2b4f66e1c910fff579f9469c3cb8486d8ddf473
Author: Robert Burke <[email protected]>
AuthorDate: Wed Apr 1 18:02:42 2020 -0700

    [BEAM-9667] Allow metrics in DoFn Setup
---
 sdks/go/pkg/beam/core/runtime/exec/pardo.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index aba5d43..14f870f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -75,7 +75,11 @@ func (n *ParDo) Up(ctx context.Context) error {
        n.status = Up
        n.inv = newInvoker(n.Fn.ProcessElementFn())
 
-       if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != 
nil {
+       // We can't cache the context during Setup since it runs only once per 
bundle.
+       // Subsequent bundles might run this same node, and the context here 
would be 
+       // incorrectly refering to the older bundleId.
+       setupCtx :=  metrics.SetPTransformID(ctx, n.PID)
+       if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err 
!= nil {
                return n.fail(err)
        }
 

Reply via email to