[ 
https://issues.apache.org/jira/browse/BEAM-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826096#comment-16826096
 ] 

Alexandre Thenorio edited comment on BEAM-7065 at 4/25/19 2:02 PM:
-------------------------------------------------------------------

Thank you very much [~lostluck]. I have forked the SDK and updated it to allow 
for unlifted combines (It can be found here 
[https://github.com/ByteFlinger/beam/commit/c94cf9a9bbcdb41ce8b55e27b3959ab043c96455]),
 and built another image based on that (alethenorio/beam-go:unlifted_combines)

 

And it no longer throws the error however I see no debug prints. I also see a 
few of these warnings on stackdriver logs though I am uncertain whether they 
are relevant
{noformat}
exception: "java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Error received from SDK harness for instruction 
-68: execution plan for -56 not found at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: 
Error received from SDK harness for instruction -68: execution plan for -56 not 
found at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
 at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 ... 3 more{noformat}
I then wondered whether it had to do with the fact I don't have a window and 
added it and also removed pubsub out of the equation to make sure nothing else 
was the issue

 
{code:java}
package main


import (
    "context"
    "encoding/json"
    "flag"
    "reflect"
    "time"


    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)


type Sensor struct {
    Name      string
    Value     int
    Timestamp int64
}


var (
    now  = time.Now()
    data = []Sensor{
        {Name: "temperature", Value: 20, Timestamp: now.Unix()},
        {Name: "temperature", Value: 22, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "temperature", Value: 24, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "temperature", Value: 26, Timestamp: now.Add(3 * 
time.Second).Unix()},
        {Name: "humidity", Value: 10, Timestamp: now.Unix()},
        {Name: "humidity", Value: 12, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "humidity", Value: 14, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "humidity", Value: 16, Timestamp: now.Add(3 * 
time.Second).Unix()},
    }
)


func init() {
    beam.RegisterType(reflect.TypeOf((*addTimestampFn)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*Sensor)(nil)).Elem())
}


func main() {
    flag.Parse()
    beam.Init()


    ctx := context.Background()


    p := beam.NewPipeline()
    s := p.Root()


    // Emits fixed sensor data
    // Returns PCollection<[]byte>
    col := beam.Create(s, Bytes(data...)...)


    // Transforms incoming bytes from pubsub to a *Sensor
    // Accepts PCollection<[]byte>
    // Returns PCollection<*Sensor>
    sensor := beam.ParDo(s, extractSensorData, col)


    // Injects the tag timestamp as the timestamp to be used by the window 
operation
    // Accepts PCollection<*Sensor>
    // Returns PCollection<*Sensor> with window timestamps
    tsSensor := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, sensor)


    // Transforms incoming bytes from pubsub to a string,int key value
    // where the key is the sensor name and the value is the sensor reading
    // Accepts PCollection<*Sensor>
    // Returns PCollection<KV<string,int>>
    data := beam.ParDo(s, extractKV, tsSensor)


    // Split into windows of 2 seconds every second
    window := beam.WindowInto(s, window.NewSlidingWindows(time.Second, 
2*time.Second), data)


    // Calculate running average per sensor
    //
    // Accpets PCollection<KV<string,int>>
    // Returns PCollection<KV<string,int>>
    avg := stats.MeanPerKey(s, window)


    // Prints the values for each window average
    beam.ParDo0(s, printValue, avg)


    if err := beamx.Run(context.Background(), p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}


func Bytes(data ...Sensor) []interface{} {


    bytes := []interface{}{}


    for _, d := range data {
        b, err := json.Marshal(&d)
        if err != nil {
            log.Fatal(context.Background(), err)
        }


        bytes = append(bytes, b)
    }


    return bytes
}


type addTimestampFn struct {
    Min beam.EventTime `json:"min"`
}


func (f *addTimestampFn) ProcessElement(t *Sensor) (beam.EventTime, *Sensor) {


    timestamp := mtime.FromTime(time.Unix(t.Timestamp, 0))
    return timestamp, t
}


func printValue(key string, val float64) {
    log.Infof(context.Background(), "Key %s, Val: %f", key, val)
}


func extractSensorData(msg []byte) *Sensor {


    ctx := context.Background()


    data := &Sensor{}
    if err := json.Unmarshal(msg, data); err != nil {
        log.Fatal(ctx, err)
    }


    return data
}


func extractKV(data *Sensor) (string, int) {
    return data.Name, data.Value
}


{code}
This worked like a charm and those warnings were not there (And I can see the 
prints on stackdriver)

I then proceeded to add pubsub back to this code instead of emitting the tags 
with beam.Create and once again it did not work and the warnings were back.

 
{code:java}
package main


import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "reflect"
    "time"


    "cloud.google.com/go/pubsub"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)


var (
    input = flag.String("input", "iot-data", "Pubsub input topic.")
)


type Sensor struct {
    Name      string
    Value     int
    Timestamp int64
}


var (
    now  = time.Now()
    data = []Sensor{
        {Name: "temperature", Value: 20, Timestamp: now.Unix()},
        {Name: "temperature", Value: 22, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "temperature", Value: 24, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "temperature", Value: 26, Timestamp: now.Add(3 * 
time.Second).Unix()},
        {Name: "humidity", Value: 10, Timestamp: now.Unix()},
        {Name: "humidity", Value: 12, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "humidity", Value: 14, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "humidity", Value: 16, Timestamp: now.Add(3 * 
time.Second).Unix()},
    }
)


func init() {
    beam.RegisterType(reflect.TypeOf((*addTimestampFn)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*Sensor)(nil)).Elem())
}


func main() {
    flag.Parse()
    beam.Init()


    ctx := context.Background()
    project := gcpopts.GetProject(ctx)


    log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)


    defer pubsubx.CleanupTopic(ctx, project, *input)
    sub, err := Publish(ctx, project, *input, data...)
    if err != nil {
        log.Fatal(ctx, err)
    }


    log.Infof(ctx, "Running streaming sensor data with subscription: %v", 
sub.ID())


    p := beam.NewPipeline()
    s := p.Root()


    // Reads sensor data from pubsub
    // Returns PCollection<[]byte>
    col := pubsubio.Read(s, project, *input, 
&pubsubio.ReadOptions{Subscription: sub.ID()})


    // // Emits fixed sensor data
    // // Returns PCollection<[]byte>
    // col := beam.Create(s, Bytes(data...)...)


    // Transforms incoming bytes from pubsub to a *Sensor
    // Accepts PCollection<[]byte>
    // Returns PCollection<*Sensor>
    sensor := beam.ParDo(s, extractSensorData, col)


    // Injects the tag timestamp as the timestamp to be used by the window 
operation
    // Accepts PCollection<*Sensor>
    // Returns PCollection<*Sensor> with window timestamps
    tsSensor := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, sensor)


    // Transforms incoming bytes from pubsub to a string,int key value
    // where the key is the sensor name and the value is the sensor reading
    // Accepts PCollection<*Sensor>
    // Returns PCollection<KV<string,int>>
    data := beam.ParDo(s, extractKV, tsSensor)


    // Split into windows of 2 seconds every second
    window := beam.WindowInto(s, window.NewSlidingWindows(time.Second, 
2*time.Second), data)


    // Calculate running average per sensor
    //
    // Accpets PCollection<KV<string,int>>
    // Returns PCollection<KV<string,int>>
    avg := stats.MeanPerKey(s, window)


    // Prints the values for each window average
    beam.ParDo0(s, printValue, avg)


    if err := beamx.Run(context.Background(), p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}


func Bytes(data ...Sensor) []interface{} {


    bytes := []interface{}{}


    for _, d := range data {
        b, err := json.Marshal(&d)
        if err != nil {
            log.Fatal(context.Background(), err)
        }


        bytes = append(bytes, b)
    }


    return bytes
}


type addTimestampFn struct {
    Min beam.EventTime `json:"min"`
}


func (f *addTimestampFn) ProcessElement(t *Sensor) (beam.EventTime, *Sensor) {


    timestamp := mtime.FromTime(time.Unix(t.Timestamp, 0))
    return timestamp, t
}


func printValue(key string, val float64) {
    log.Infof(context.Background(), "Key %s, Val: %f", key, val)
}


func extractSensorData(msg []byte) *Sensor {


    ctx := context.Background()


    data := &Sensor{}
    if err := json.Unmarshal(msg, data); err != nil {
        log.Fatal(ctx, err)
    }


    return data
}


func extractKV(data *Sensor) (string, int) {
    return data.Name, data.Value
}


func Publish(ctx context.Context, project, topic string, messages ...Sensor) 
(*pubsub.Subscription, error) {
    client, err := pubsub.NewClient(ctx, project)
    if err != nil {
        return nil, err
    }
    t, err := pubsubx.EnsureTopic(ctx, client, topic)
    if err != nil {
        return nil, err
    }
    sub, err := pubsubx.EnsureSubscription(ctx, client, topic, 
fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix()))
    if err != nil {
        return nil, err
    }


    for _, msg := range messages {
        s := &Sensor{}
        bytes, err := json.Marshal(s)
        if err != nil {
            return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err)
        }
        m := &pubsub.Message{
            Data: ([]byte)(bytes),
            // Attributes: ??
        }
        id, err := t.Publish(ctx, m).Get(ctx)
        if err != nil {
            return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
        }
        log.Infof(ctx, "Published %v with id: %v", msg, id)
    }
    return sub, nil
}


{code}
 

Not certain what the issue with pubsub here is but I guess this specific issue 
can be closed for now. Any more info you might have on the issue with pubsub is 
welcome

 


was (Author: alethenorio):
Thank you very much Robert. I have forked the SDK and updated it to allow for 
unlifted combines (It can be found here 
https://github.com/ByteFlinger/beam/commit/c94cf9a9bbcdb41ce8b55e27b3959ab043c96455),
 and built another image based on that (alethenorio/beam-go:unlifted_combines)

 

And it no longer throws the error however I see no debug prints. I also see a 
few of these warnings on stackdriver logs though I am uncertain whether they 
are relevant
{noformat}
exception: "java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Error received from SDK harness for instruction 
-68: execution plan for -56 not found at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
 at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: 
Error received from SDK harness for instruction -68: execution plan for -56 not 
found at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
 at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 ... 3 more{noformat}
I then wondered whether it had to do with the fact I don't have a window and 
added it and also removed pubsub out of the equation to make sure nothing else 
was the issue

 
{code:java}
package main


import (
    "context"
    "encoding/json"
    "flag"
    "reflect"
    "time"


    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)


type Sensor struct {
    Name      string
    Value     int
    Timestamp int64
}


var (
    now  = time.Now()
    data = []Sensor{
        {Name: "temperature", Value: 20, Timestamp: now.Unix()},
        {Name: "temperature", Value: 22, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "temperature", Value: 24, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "temperature", Value: 26, Timestamp: now.Add(3 * 
time.Second).Unix()},
        {Name: "humidity", Value: 10, Timestamp: now.Unix()},
        {Name: "humidity", Value: 12, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "humidity", Value: 14, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "humidity", Value: 16, Timestamp: now.Add(3 * 
time.Second).Unix()},
    }
)


func init() {
    beam.RegisterType(reflect.TypeOf((*addTimestampFn)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*Sensor)(nil)).Elem())
}


func main() {
    flag.Parse()
    beam.Init()


    ctx := context.Background()


    p := beam.NewPipeline()
    s := p.Root()


    // Emits fixed sensor data
    // Returns PCollection<[]byte>
    col := beam.Create(s, Bytes(data...)...)


    // Transforms incoming bytes from pubsub to a *Sensor
    // Accepts PCollection<[]byte>
    // Returns PCollection<*Sensor>
    sensor := beam.ParDo(s, extractSensorData, col)


    // Injects the tag timestamp as the timestamp to be used by the window 
operation
    // Accepts PCollection<*Sensor>
    // Returns PCollection<*Sensor> with window timestamps
    tsSensor := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, sensor)


    // Transforms incoming bytes from pubsub to a string,int key value
    // where the key is the sensor name and the value is the sensor reading
    // Accepts PCollection<*Sensor>
    // Returns PCollection<KV<string,int>>
    data := beam.ParDo(s, extractKV, tsSensor)


    // Split into windows of 2 seconds every second
    window := beam.WindowInto(s, window.NewSlidingWindows(time.Second, 
2*time.Second), data)


    // Calculate running average per sensor
    //
    // Accpets PCollection<KV<string,int>>
    // Returns PCollection<KV<string,int>>
    avg := stats.MeanPerKey(s, window)


    // Prints the values for each window average
    beam.ParDo0(s, printValue, avg)


    if err := beamx.Run(context.Background(), p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}


func Bytes(data ...Sensor) []interface{} {


    bytes := []interface{}{}


    for _, d := range data {
        b, err := json.Marshal(&d)
        if err != nil {
            log.Fatal(context.Background(), err)
        }


        bytes = append(bytes, b)
    }


    return bytes
}


type addTimestampFn struct {
    Min beam.EventTime `json:"min"`
}


func (f *addTimestampFn) ProcessElement(t *Sensor) (beam.EventTime, *Sensor) {


    timestamp := mtime.FromTime(time.Unix(t.Timestamp, 0))
    return timestamp, t
}


func printValue(key string, val float64) {
    log.Infof(context.Background(), "Key %s, Val: %f", key, val)
}


func extractSensorData(msg []byte) *Sensor {


    ctx := context.Background()


    data := &Sensor{}
    if err := json.Unmarshal(msg, data); err != nil {
        log.Fatal(ctx, err)
    }


    return data
}


func extractKV(data *Sensor) (string, int) {
    return data.Name, data.Value
}


{code}
This worked like a charm and those warnings were not there (And I can see the 
prints on stackdriver)

I then proceeded to add pubsub back to this code instead of emitting the tags 
with beam.Create and once again it did not work and the warnings were back.

 
{code:java}
package main


import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "reflect"
    "time"


    "cloud.google.com/go/pubsub"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)


var (
    input = flag.String("input", "iot-data", "Pubsub input topic.")
)


type Sensor struct {
    Name      string
    Value     int
    Timestamp int64
}


var (
    now  = time.Now()
    data = []Sensor{
        {Name: "temperature", Value: 20, Timestamp: now.Unix()},
        {Name: "temperature", Value: 22, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "temperature", Value: 24, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "temperature", Value: 26, Timestamp: now.Add(3 * 
time.Second).Unix()},
        {Name: "humidity", Value: 10, Timestamp: now.Unix()},
        {Name: "humidity", Value: 12, Timestamp: now.Add(1 * 
time.Second).Unix()},
        {Name: "humidity", Value: 14, Timestamp: now.Add(2 * 
time.Second).Unix()},
        {Name: "humidity", Value: 16, Timestamp: now.Add(3 * 
time.Second).Unix()},
    }
)


func init() {
    beam.RegisterType(reflect.TypeOf((*addTimestampFn)(nil)).Elem())
    beam.RegisterType(reflect.TypeOf((*Sensor)(nil)).Elem())
}


func main() {
    flag.Parse()
    beam.Init()


    ctx := context.Background()
    project := gcpopts.GetProject(ctx)


    log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)


    defer pubsubx.CleanupTopic(ctx, project, *input)
    sub, err := Publish(ctx, project, *input, data...)
    if err != nil {
        log.Fatal(ctx, err)
    }


    log.Infof(ctx, "Running streaming sensor data with subscription: %v", 
sub.ID())


    p := beam.NewPipeline()
    s := p.Root()


    // Reads sensor data from pubsub
    // Returns PCollection<[]byte>
    col := pubsubio.Read(s, project, *input, 
&pubsubio.ReadOptions{Subscription: sub.ID()})


    // // Emits fixed sensor data
    // // Returns PCollection<[]byte>
    // col := beam.Create(s, Bytes(data...)...)


    // Transforms incoming bytes from pubsub to a *Sensor
    // Accepts PCollection<[]byte>
    // Returns PCollection<*Sensor>
    sensor := beam.ParDo(s, extractSensorData, col)


    // Injects the tag timestamp as the timestamp to be used by the window 
operation
    // Accepts PCollection<*Sensor>
    // Returns PCollection<*Sensor> with window timestamps
    tsSensor := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, sensor)


    // Transforms incoming bytes from pubsub to a string,int key value
    // where the key is the sensor name and the value is the sensor reading
    // Accepts PCollection<*Sensor>
    // Returns PCollection<KV<string,int>>
    data := beam.ParDo(s, extractKV, tsSensor)


    // Split into windows of 2 seconds every second
    window := beam.WindowInto(s, window.NewSlidingWindows(time.Second, 
2*time.Second), data)


    // Calculate running average per sensor
    //
    // Accpets PCollection<KV<string,int>>
    // Returns PCollection<KV<string,int>>
    avg := stats.MeanPerKey(s, window)


    // Prints the values for each window average
    beam.ParDo0(s, printValue, avg)


    if err := beamx.Run(context.Background(), p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}


func Bytes(data ...Sensor) []interface{} {


    bytes := []interface{}{}


    for _, d := range data {
        b, err := json.Marshal(&d)
        if err != nil {
            log.Fatal(context.Background(), err)
        }


        bytes = append(bytes, b)
    }


    return bytes
}


type addTimestampFn struct {
    Min beam.EventTime `json:"min"`
}


func (f *addTimestampFn) ProcessElement(t *Sensor) (beam.EventTime, *Sensor) {


    timestamp := mtime.FromTime(time.Unix(t.Timestamp, 0))
    return timestamp, t
}


func printValue(key string, val float64) {
    log.Infof(context.Background(), "Key %s, Val: %f", key, val)
}


func extractSensorData(msg []byte) *Sensor {


    ctx := context.Background()


    data := &Sensor{}
    if err := json.Unmarshal(msg, data); err != nil {
        log.Fatal(ctx, err)
    }


    return data
}


func extractKV(data *Sensor) (string, int) {
    return data.Name, data.Value
}


func Publish(ctx context.Context, project, topic string, messages ...Sensor) 
(*pubsub.Subscription, error) {
    client, err := pubsub.NewClient(ctx, project)
    if err != nil {
        return nil, err
    }
    t, err := pubsubx.EnsureTopic(ctx, client, topic)
    if err != nil {
        return nil, err
    }
    sub, err := pubsubx.EnsureSubscription(ctx, client, topic, 
fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix()))
    if err != nil {
        return nil, err
    }


    for _, msg := range messages {
        s := &Sensor{}
        bytes, err := json.Marshal(s)
        if err != nil {
            return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err)
        }
        m := &pubsub.Message{
            Data: ([]byte)(bytes),
            // Attributes: ??
        }
        id, err := t.Publish(ctx, m).Get(ctx)
        if err != nil {
            return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
        }
        log.Infof(ctx, "Published %v with id: %v", msg, id)
    }
    return sub, nil
}


{code}
 

Not certain what the issue with pubsub here is but I guess this specific issue 
can be closed for now. Any more info you might have on the issue with pubsub is 
welcome

 

> Unable to use unlifted combine functions
> ----------------------------------------
>
>                 Key: BEAM-7065
>                 URL: https://issues.apache.org/jira/browse/BEAM-7065
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>         Environment: Google Cloud Dataflow
>            Reporter: Alexandre Thenorio
>            Priority: Major
>
> I have tried running a simple example to calculate a running average or sum 
> using the `stats` package however it does not seems to work.
>  
> Here's a reproducer
>  
> {code:java}
> package main
> import (
>     "context"
>     "encoding/json"
>     "flag"
>     "fmt"
>     "time"
>     "cloud.google.com/go/pubsub"
>     "github.com/apache/beam/sdks/go/pkg/beam"
>     "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
>     "github.com/apache/beam/sdks/go/pkg/beam/log"
>     "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
>     "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
>     "github.com/apache/beam/sdks/go/pkg/beam/util/pubsubx"
>     "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
>     "github.com/apache/beam/sdks/go/pkg/beam/x/debug"
> )
> var (
>     input = flag.String("input", "iot-data", "Pubsub input topic.")
> )
> type sensor struct {
>     name  string
>     value int
> }
> var (
>     data = []sensor{
>         {name: "temperature", value: 24},
>         {name: "humidity", value: 10},
>         {name: "temperature", value: 20},
>         {name: "temperature", value: 22},
>         {name: "humidity", value: 14},
>         {name: "humidity", value: 18},
>     }
> )
> func main() {
>     flag.Parse()
>     beam.Init()
>     ctx := context.Background()
>     project := gcpopts.GetProject(ctx)
>     log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)
>     defer pubsubx.CleanupTopic(ctx, project, *input)
>     sub, err := Publish(ctx, project, *input, data...)
>     if err != nil {
>         log.Fatal(ctx, err)
>     }
>     log.Infof(ctx, "Running streaming sensor data with subscription: %v", 
> sub.ID())
>     p := beam.NewPipeline()
>     s := p.Root()
>     // Reads sensor data from pubsub
>     // Returns PCollection<[]byte>
>     col := pubsubio.Read(s, project, *input, 
> &pubsubio.ReadOptions{Subscription: sub.ID()})
>     // Transforms incoming bytes from pubsub to a string,int key value
>     // where the key is the sensor name and the value is the sensor reading
>     // Accepts PCollection<[]byte>
>     // Returns PCollection<KV<string,int>>
>     data := beam.ParDo(s, extractSensorData, col)
>     // Calculate running average per sensor
>     //
>     // Accpets PCollection<KV<string,int>>
>     // Returns PCollection<KV<string,int>>
>     sum := stats.MeanPerKey(s, data)
>     debug.Print(s, sum)
>     if err := beamx.Run(context.Background(), p); err != nil {
>         log.Exitf(ctx, "Failed to execute job: %v", err)
>     }
> }
> func extractSensorData(msg []byte) (string, int) {
>     ctx := context.Background()
>     data := &sensor{}
>     if err := json.Unmarshal(msg, data); err != nil {
>         log.Fatal(ctx, err)
>     }
>     return data.name, data.value
> }
> func Publish(ctx context.Context, project, topic string, messages ...sensor) 
> (*pubsub.Subscription, error) {
>     client, err := pubsub.NewClient(ctx, project)
>     if err != nil {
>         return nil, err
>     }
>     t, err := pubsubx.EnsureTopic(ctx, client, topic)
>     if err != nil {
>         return nil, err
>     }
>     sub, err := pubsubx.EnsureSubscription(ctx, client, topic, 
> fmt.Sprintf("%v.sub.%v", topic, time.Now().Unix()))
>     if err != nil {
>         return nil, err
>     }
>     for _, msg := range messages {
>         s := &sensor{}
>         bytes, err := json.Marshal(s)
>         if err != nil {
>             return nil, fmt.Errorf("failed to unmarshal '%v': %v", msg, err)
>         }
>         m := &pubsub.Message{
>             Data: ([]byte)(bytes),
>             // Attributes: ??
>         }
>         id, err := t.Publish(ctx, m).Get(ctx)
>         if err != nil {
>             return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
>         }
>         log.Infof(ctx, "Published %v with id: %v", msg, id)
>     }
>     return sub, nil
> }
> {code}
>  
> I ran this code in the following way
>  
> {noformat}
> go run . --project="<my-project>" --runner dataflow  --staging_location 
> gs://<my-gs-bucket>/binaries/ --temp_location gs://<my-gs-bucket>/tmp/ 
> --region "europe-west1" 
> --worker_harness_container_image=alethenorio/beam-go:v2.11.0{noformat}
>  
>  
>  
> The code published to pubsub and then reads the messages and attempts to call 
> `stats.MeanPerKey` to create a running average.
>  
> When deploying this on cloud dataflow, using a container I built myself from 
> the v2.11.0 version (alethenorio/beam-go:v2.11.0) I get the following error 
> every time
>  
>  
> {noformat}
> Worker panic: Unexpected transform URN: 
> beam:transform:combine_grouped_values:v1goroutine 1 [running]:
> runtime/debug.Stack(0x50, 0x0, 0x0)
> /usr/local/go/src/runtime/debug/stack.go:24 +0x9d
> runtime/debug.PrintStack()
> /usr/local/go/src/runtime/debug/stack.go:16 +0x22
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook.func1()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:77
>  +0xac
> panic(0xd4b160, 0xc001172f70)
> /usr/local/go/src/runtime/panic.go:522 +0x1b5
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makeLink(0xc00078b980,
>  0xc0007aa2c0, 0x18, 0xc0007aa180, 0x17, 0x0, 0x400000000000040, 
> 0xffffffffffffffff, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:521
>  +0x308f
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*builder).makePCollection(0xc00078b980,
>  0xc0007aa2c0, 0x18, 0xc0011775a0, 0xf, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:281
>  +0x5ff
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.UnmarshalPlan(0xc00078b2c0,
>  0xc00031a0f0, 0xd49820, 0xff5e10)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/exec/translate.go:71
>  +0x393
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000304300,
>  0x1026be0, 0xc00031a0f0, 0xc000774880, 0xc00031a0f0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:155
>  +0x1ae
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1026be0,
>  0xc00031a0f0, 0xc000774880)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:114
>  +0x1cf
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main(0x1026be0, 
> 0xc00031a0f0, 0x7ffe0437db7c, 0xf, 0x7ffe0437db9f, 0xf, 0x0, 0x0)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/harness.go:129
>  +0x786
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init.hook()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/harness/init/init.go:86
>  +0xee
> github.com/apache/beam/sdks/go/pkg/beam/core/runtime.Init()
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/core/runtime/init.go:42
>  +0x50
> github.com/apache/beam/sdks/go/pkg/beam.Init(...)
> /home/localuser/go/pkg/mod/github.com/apache/[email protected]+incompatible/sdks/go/pkg/beam/forward.go:111
> main.main()
> /home/localuser/reproducer/main.go:43 +0x8f
> {noformat}
>  
> I realize the Go SDK is not stable yet but I was uncertain where to go to 
> post this issue in case the devs are not aware so I hope this is fine.
> I get the feeling there is some issue with the gRPC requests sending the 
> wrong URN but I couldn't find where in the code the `v1goroutine` gets set (I 
> think it needs to be just v1)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to