yasser-chihab opened a new issue, #29741:
URL: https://github.com/apache/beam/issues/29741

   ### What happened?
   
   **What happened?**
   
   We are exploring the use of GCP Dataflow alongside Apache Beam Go SDK, so 
while setuping a pipline that processes event from a pubsub subscription with 
Dataflow as a runner seems working without issues, but when trying to 
streamline the local development environment with Pubsub emulator and 
DirectRunner, we face some issues.
   
   ```
   package main
   
   import (
        "context"
        "flag"
        "fmt"
        "os"
        "strings"
        "time"
   
        "cloud.google.com/go/pubsub"
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
   )
   
   var gcpProject = "project-id"
   
   type BqRow struct {
        Date      string
        Line      string
        WordCount int
   }
   
   func writeToBQ(s beam.Scope, table string, wordCountLine beam.PCollection) {
        bqRows := beam.ParDo(s, func(elem string, count int) BqRow {
                return BqRow{Date: time.Now().String(), Line: elem, WordCount: 
count}
        }, wordCountLine)
        bigqueryio.Write(s, gcpProject, table, bqRows)
   }
   
   func filterEmptyLines(line string, emit func(string)) {
        if len(line) > 0 {
                emit(line)
        }
   }
   
   func wordCountLine(line string, emit func(string, int)) {
        trimedLine := strings.TrimSpace(line)
        words := strings.Split(trimedLine, " ")
        emit(line, len(words))
   }
   
   func convertPubsubMsg(msg []byte) string {
        return (string)(msg)
   }
   
   func main() {
        ctx := context.Background()
        os.Setenv("local", "true")
        if local := os.Getenv("local"); local == "true" {
                flag.Set("runner", "DirectRunner")
                flag.Set("streaming", "true")
        } else {
                flag.Set("runner", "dataflow")
                flag.Set("project", gcpProject)
                flag.Set("region", "us-east1")
                flag.Set("staging_location", "gs://stream-words/binaries")
        }
   
        pubsubSubId := "streamWord-sub"
        pubsubTopic := "streamWords"
        BQDatasetTable := "project-id:streamWords.stats"
   
        flag.Parse()
        beam.Init()
   
        //pubsub client
        pubClient, err := pubsub.NewClient(ctx, "gcp-project-id")
        if err != nil {
                fmt.Print(err)
        }
        defer pubClient.Close()
   
        // Create the Pipeline object and root scope.
        pipeline, scope := beam.NewPipelineWithRoot()
   
   
        // read stream of msg from pubsub
        streamWords := pubsubio.Read(scope, gcpProject, pubsubTopic, 
&pubsubio.ReadOptions{
                Subscription: pubsubSubId})
   
        //create fixed windows of 10s
        windowedStream := beam.WindowInto(scope, 
window.NewFixedWindows(time.Second*10), streamWords)
        
        // convert pubsub msg to string
        msg := beam.ParDo(scope, convertPubsubMsg, windowedStream)
        
        
        //filter empty lines
        nonEmptyLines := beam.ParDo(scope, filterEmptyLines, msg)
        
        //count words in each line
        wordCountLine := beam.ParDo(scope, wordCountLine, nonEmptyLines)
        
        // write to BQ
        writeToBQ(scope, table, wordCountLine)
        debug.Print(scope, wordCountLine)
   
        // Run the beam pipeline
        if err := beamx.Run(ctx, pipeline); err != nil {
                log.Exitf(ctx, "Failed to execute job: %v", err)
        }
   }
   
   ```
   
   When running the following pipline with the DirectRunner and PubSub emulator 
is used, the pipline throws the following error:
   
   ```
   2023/12/13 09:36:27 Executing pipeline with the direct runner.
   2023/12/13 09:36:27 Pipeline:
   2023/12/13 09:36:27 Nodes: {1: []uint8/bytes GLO:unbounded}
   {2: []uint8/bytes FIX[10s]:unbounded}
   {3: string/string FIX[10s]:unbounded}
   {4: string/string FIX[10s]:unbounded}
   {5: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
   {6: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
   Edges: 1: External [] -> [Out: []uint8 -> {1: []uint8/bytes GLO:unbounded}]
   2: WindowInto [In(Main): []uint8 <- {1: []uint8/bytes GLO:unbounded}] -> 
[Out: []uint8 -> {2: []uint8/bytes FIX[10s]:unbounded}]
   3: ParDo [In(Main): []uint8 <- {2: []uint8/bytes FIX[10s]:unbounded}] -> 
[Out: string -> {3: string/string FIX[10s]:unbounded}]
   4: ParDo [In(Main): string <- {3: string/string FIX[10s]:unbounded}] -> 
[Out: string -> {4: string/string FIX[10s]:unbounded}]
   5: ParDo [In(Main): string <- {4: string/string FIX[10s]:unbounded}] -> 
[Out: KV<string,int> -> {5: KV<string,int>/KV<string,int[varintz]> 
FIX[10s]:unbounded}]
   6: ParDo [In(Main): KV<X,Y> <- {5: KV<string,int>/KV<string,int[varintz]> 
FIX[10s]:unbounded}] -> [Out: KV<X,Y> -> {6: 
KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}]
   2023/12/13 09:36:27 Failed to execute job: translation failed
           caused by:
   no root units
   exit status 1
   ```
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to