[
https://issues.apache.org/jira/browse/BEAM-12979?focusedWorklogId=659602&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-659602
]
ASF GitHub Bot logged work on BEAM-12979:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Oct/21 14:34
Start Date: 04/Oct/21 14:34
Worklog Time Spent: 10m
Work Description: jrmccluskey commented on a change in pull request
#15639:
URL: https://github.com/apache/beam/pull/15639#discussion_r721426503
##########
File path: sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go
##########
@@ -21,25 +21,45 @@
package statecache
import (
+ "io"
"sync"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
)
type token string
-// ReusableInput is a resettable value, notably used to unwind iterators
cheaply
-// and cache materialized side input across invocations.
+// FullValue represents the full runtime value for a data element, incl. the
+// implicit context. The result of a GBK or CoGBK is not a single FullValue.
+// The consumer is responsible for converting the values to the correct type.
+// To represent a nested KV with FullValues, assign a *FullValue to Elm/Elm2.
//
-// Redefined from exec's input.go to avoid a cyclical dependency.
-type ReusableInput interface {
- // Init initializes the value before use.
- Init() error
- // Value returns the side input value.
- Value() interface{}
- // Reset resets the value after use.
- Reset() error
+// Copied from exec/fullvalue.go to avoid cyclical dependencies.
+type FullValue struct {
+ Elm interface{} // Element or KV key.
+ Elm2 interface{} // KV value, if not invalid
+
+ Timestamp typex.EventTime
+ Windows []typex.Window
+ Pane typex.PaneInfo
+}
+
+// Stream is a FullValue reader. It returns io.EOF when complete, but can be
+// prematurely closed.
+//
+// Copied from exec/fullvalue.go to prevent cyclical dependencies.
+type Stream interface {
+ io.Closer
+ Read() (*FullValue, error)
Review comment:
I've taken a run at fixing this and have precommits passing. I'm
surprised though, the previous version with ReusableInputs did the same thing
without typing issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 659602)
Time Spent: 50m (was: 40m)
> [Go SDK] Avoid shallow copying CacheToken protobufs in statecache.go
> --------------------------------------------------------------------
>
> Key: BEAM-12979
> URL: https://issues.apache.org/jira/browse/BEAM-12979
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Daniel Oliveira
> Assignee: Jack McCluskey
> Priority: P2
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Copying protobuf messages should be avoided because it causes a ton of subtle
> bugs, especially when mutexes inside the object get copied. We should be
> using pointers instead.
> Some spots where copies of CacheTokens are made instead of pointers:
> [https://github.com/apache/beam/blob/f051cd91d46e5dab0ca48f108b27d9d87e6e5e8f/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go#L92]
> [https://github.com/apache/beam/blob/f051cd91d46e5dab0ca48f108b27d9d87e6e5e8f/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go#L124]
> [https://github.com/apache/beam/blob/f051cd91d46e5dab0ca48f108b27d9d87e6e5e8f/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache_test.go#L119]
> This isn't an exhaustive list, but it covers some of the major instances.
> Should be easy enough to ctrl+f "fnpb.ProcessBundleRequest_CacheToken" and
> just make sure it's used as a pointer wherever possible.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)