[
https://issues.apache.org/jira/browse/BEAM-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818175#comment-16818175
]
Robert Burke commented on BEAM-7065:
------------------------------------
There are two issues here:
1) At the time of the 2.11 Beam release, stats.Mean's MergeAccumulators was
broken. It was recently fixed in [https://github.com/apache/beam/pull/8243]
(see the changes to mean.go & the associated shims.)
if you update your Go SDK package to latest, it should funciton.
2) Unlifted combines, which return the URN
beam:transform:combine_grouped_values:v1
aren't presently handled in the Go SDK, so that's a real bug that should be
pretty easy to resolve.
Unlifted Combines can be considered as CreateAccumulator + AddInput +
ExtractOutput, no MergeAccumulators)
It should be as simple as adding the URN here:
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L40]
switch it to extract the combine payload here:
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L348]
At which point the usual combine logic should correctly handle the unlifted
combine:
[https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L409]
As a rule, the SDK prevents purely unlifted CombineFns since they're confusing
from a performance standpoint. One is typically better off using a ParDo for
the same work. The referenced PR actively prevents it from happening as a rule.
However, some runners may dynamically determine that "not* lifting a combine is
an optimization, so the SDK needs to support this URN properly regardless.
Aside On versioning:
The Go SDK isn't meaningfully versioned against the rest of Beam, let alone how
the Go ecosystem treats it. . The SDK is Experimental and not 2.X at this time.
It's a long standing issue that's complicated because the Beam project uses
Gradle for operational tasks, and the Go Gradle plugin hasn't updated to handle
Go Modules yet. It's under investigation on how to make sure we're testing the
same dependency set that we'll be recommending to users in Go Modules.
> Unable to use unlifted combine functions
> ----------------------------------------
>
> Key: BEAM-7065
> URL: https://issues.apache.org/jira/browse/BEAM-7065
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Affects Versions: Not applicable
> Environment: Google Cloud Dataflow
> Reporter: Alexandre Thenorio
> Priority: Major
>
> I have tried running a simple example to calculate a running average or sum
> using the `stats` package however it does not seems to work.
>
> Here's a reproducer
>
> {code:java}
> package main
> import (
> "context"
> "encoding/json"
> "flag"
> "fmt"
> "time"
> "cloud.google.com/go/pubsub"
> "github.com/apache/beam/sdks/go/pkg/beam"
> "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
> "github.com/apache/beam/sdks/go/pkg/beam/log"
> "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
> "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
> "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
> "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
> "github.com/apache/beam/sdks/go/pkg/beam/x/debug"
> )
> var (
> input = flag.String("input", "iot-data", "Pubsub input topic.")
> )
> type sensor struct {
> name string
> value int
> }
> var (
> data = []sensor{
> {name: "temperature", value: 24},
> {name: "humidity", value: 10},
> {name: "temperature", value: 20},
> {name: "temperature", value: 22},
> {name: "humidity", value: 14},
> {name: "humidity", value: 18},
> }
> )
> func main() {
> flag.Parse()
> beam.Init()
> ctx := context.Background()
> project := gcpopts.GetProject(ctx)
> log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)
> defer pubsubx.CleanupTopic(ctx, project, *input)
> sub, err := Publish(ctx, project, *input, data...)
> if err != nil {
> log.Fatal(ctx, err)
> }
> log.Infof(ctx, "Running streaming sensor data with subscription: %v",
> sub.ID())
> p := beam.NewPipeline()
> s := p.Root()
> // Reads sensor data from pubsub
> // Returns PCollection<[]byte>
> col := pubsubio.Read(s, project, *input,
> &pubsubio.ReadOptions{Subscription: sub.ID()})
> // Transforms incoming bytes from pubsub to a string,int key value
> // where the key is the sensor name and the value is the sensor reading
> // Accepts PCollection<[]byte>
> // Returns PCollection<KV<string,int>>
> data := beam.ParDo(s, extractSensorData, col)
> // Calculate running average per sensor
> //
> // Accpets PCollection<KV<string,int>>
> // Returns PCollection<KV<string,int>>
> sum := stats.MeanPerKey(s, data)
> debug.Print(s, sum)
> if err := beamx.Run(context.Background(), p); err != nil {
> log.Exitf(ctx, "Failed to execute job: %v", err)
> }
> }
> func extractSensorData(msg []byte) (string, int) {
> ctx := context.Background()
> data := &sensor{}
> if err := json.Unmarshal(msg, data); err != nil {
> log.Fatal(ctx, err)
> }
> return data.name, data.value
> }
> func Publish(ctx context.Context, project, topic string, messages ...sensor)
> (*pubsub.Subscription, error) {
> client, err := pubsub.NewClient(ctx, project)
> if err != nil {
> return nil, err
> }
> t, err := pubsubx.EnsureTopic(ctx, client, topic)
> if err != nil {
> return nil, err
> }
> sub, err := pubsubx.EnsureSubscription(ctx, client, topic,
> fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix()))
> if err != nil {
> return nil, err
> }
> for _, msg := range messages {
> s := &sensor{}
> bytes, err := json.Marshal(s)
> if err != nil {
> return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err)
> }
> m := &pubsub.Message{
> Data: ([]byte)(bytes),
> // Attributes: ??
> }
> id, err := t.Publish(ctx, m).Get(ctx)
> if err != nil {
> return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
> }
> log.Infof(ctx, "Published %v with id: %v", msg, id)
> }
> return sub, nil
> }
> {code}
>
> I ran this code in the following way
>
> {noformat}
> go run . --project="<my-project>" --runner dataflow --staging_location
> gs://<my-gs-bucket>/binaries/ --temp_location gs://<my-gs-bucket>/tmp/
> --region "europe-west1"
> --worker_harness_container_image=alethenorio/beam-go:v2.11.0{noformat}
>
>
>
> The code published to pubsub and then reads the messages and attempts to call
> `stats.MeanPerKey` to create a running average.
>
> When deploying this on cloud dataflow, using a container I built myself from
> the v2.11.0 version (alethenorio/beam-go:v2.11.0) I get the following error
> every time
>
>
> {noformat}
> Worker panic: Unexpected transform URN:
> beam:transform:combine_grouped_values:v1goroutine 1 [running]:
> runtime/debug.Stack(0x50, 0x0, 0x0)
> /usr/local/go/src/runtime/debug/stack.go:24 +0x9d
> runtime/debug.PrintStack()
> /usr/local/go/src/runtime/debug/stack.go:16 +0x22
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook.func1()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:77
> +0xac
> panic(0xd4b160, 0xc001172f70)
> /usr/local/go/src/runtime/panic.go:522 +0x1b5
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makeLink(0xc00078b980,
> 0xc0007aa2c0, 0x18, 0xc0007aa180, 0x17, 0x0, 0x400000000000040,
> 0xffffffffffffffff, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:521
> +0x308f
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makePCollection(0xc00078b980,
> 0xc0007aa2c0, 0x18, 0xc0011775a0, 0xf, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:281
> +0x5ff
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.UnmarshalPlan(0xc00078b2c0,
> 0xc00031a0f0, 0xd49820, 0xff5e10)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:71
> +0x393
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000304300,
> 0x1026be0, 0xc00031a0f0, 0xc000774880, 0xc00031a0f0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:155
> +0x1ae
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1026be0,
> 0xc00031a0f0, 0xc000774880)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
> +0x1cf
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main(0x1026be0,
> 0xc00031a0f0, 0x7ffe0437db7c, 0xf, 0x7ffe0437db9f, 0xf, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:129
> +0x786
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:86
> +0xee
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime.Init()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/init.go:42
> +0x50
> github.com/apache/beam/sdks/go/pkg/beam.Init(...)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/forward.go:111
> main.main()
> /home/localuser/reproducer/main.go:43 +0x8f
> {noformat}
>
> I realize the Go SDK is not stable yet but I was uncertain where to go to
> post this issue in case the devs are not aware so I hope this is fine.
> I get the feeling there is some issue with the gRPC requests sending the
> wrong URN but I couldn't find where in the code the `v1goroutine` gets set (I
> think it needs to be just v1)
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)