This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new faeb062 Change conflicting StateReader name from side to reader
(#15348)
faeb062 is described below
commit faeb062fecf6a61b3583ec26254c5466d9175098
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu Aug 19 14:01:46 2021 -0400
Change conflicting StateReader name from side to reader (#15348)
---
sdks/go/pkg/beam/core/runtime/exec/pardo.go | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index f98be23..8cbd08a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -43,8 +43,8 @@ type ParDo struct {
ctx context.Context
inv *invoker
- side StateReader
- cache *cacheElm
+ reader StateReader
+ cache *cacheElm
status Status
err errorx.GuardedError
@@ -97,7 +97,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string,
data DataContext) er
return errors.Errorf("invalid status for pardo %v: %v, want
Up", n.UID, n.status)
}
n.status = Active
- n.side = data.State
+ n.reader = data.State
// Allocating contexts all the time is expensive, but we seldom
re-write them,
// and never accept modified contexts from users, so we will cache them
per-bundle
// per-unit, to avoid the constant allocation overhead.
@@ -201,7 +201,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
return n.fail(err)
}
- n.side = nil
+ n.reader = nil
n.cache = nil
if err := MultiFinishBundle(n.ctx, n.Out...); err != nil {
@@ -216,7 +216,7 @@ func (n *ParDo) Down(ctx context.Context) error {
return n.err.Error()
}
n.status = Down
- n.side = nil
+ n.reader = nil
n.cache = nil
if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err
!= nil {
@@ -255,7 +255,7 @@ func (n *ParDo) initSideInput(ctx context.Context, w
typex.Window) error {
streams := make([]ReStream, len(n.Side), len(n.Side))
for i, adapter := range n.Side {
- s, err := adapter.NewIterable(ctx, n.side, w)
+ s, err := adapter.NewIterable(ctx, n.reader, w)
if err != nil {
return err
}