yasser-chihab opened a new issue, #29741:
URL: https://github.com/apache/beam/issues/29741
### What happened?
**What happened?**
We are exploring the use of GCP Dataflow alongside Apache Beam Go SDK, so
while setuping a pipline that processes event from a pubsub subscription with
Dataflow as a runner seems working without issues, but when trying to
streamline the local development environment with Pubsub emulator and
DirectRunner, we face some issues.
```
package main
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)
var gcpProject = "project-id"
type BqRow struct {
Date string
Line string
WordCount int
}
func writeToBQ(s beam.Scope, table string, wordCountLine beam.PCollection) {
bqRows := beam.ParDo(s, func(elem string, count int) BqRow {
return BqRow{Date: time.Now().String(), Line: elem, WordCount:
count}
}, wordCountLine)
bigqueryio.Write(s, gcpProject, table, bqRows)
}
func filterEmptyLines(line string, emit func(string)) {
if len(line) > 0 {
emit(line)
}
}
func wordCountLine(line string, emit func(string, int)) {
trimedLine := strings.TrimSpace(line)
words := strings.Split(trimedLine, " ")
emit(line, len(words))
}
func convertPubsubMsg(msg []byte) string {
return (string)(msg)
}
func main() {
ctx := context.Background()
os.Setenv("local", "true")
if local := os.Getenv("local"); local == "true" {
flag.Set("runner", "DirectRunner")
flag.Set("streaming", "true")
} else {
flag.Set("runner", "dataflow")
flag.Set("project", gcpProject)
flag.Set("region", "us-east1")
flag.Set("staging_location", "gs://stream-words/binaries")
}
pubsubSubId := "streamWord-sub"
pubsubTopic := "streamWords"
BQDatasetTable := "project-id:streamWords.stats"
flag.Parse()
beam.Init()
//pubsub client
pubClient, err := pubsub.NewClient(ctx, "gcp-project-id")
if err != nil {
fmt.Print(err)
}
defer pubClient.Close()
// Create the Pipeline object and root scope.
pipeline, scope := beam.NewPipelineWithRoot()
// read stream of msg from pubsub
streamWords := pubsubio.Read(scope, gcpProject, pubsubTopic,
&pubsubio.ReadOptions{
Subscription: pubsubSubId})
//create fixed windows of 10s
windowedStream := beam.WindowInto(scope,
window.NewFixedWindows(time.Second*10), streamWords)
// convert pubsub msg to string
msg := beam.ParDo(scope, convertPubsubMsg, windowedStream)
//filter empty lines
nonEmptyLines := beam.ParDo(scope, filterEmptyLines, msg)
//count words in each line
wordCountLine := beam.ParDo(scope, wordCountLine, nonEmptyLines)
// write to BQ
writeToBQ(scope, table, wordCountLine)
debug.Print(scope, wordCountLine)
// Run the beam pipeline
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
```
When running the following pipline with the DirectRunner and PubSub emulator
is used, the pipline throws the following error:
```
2023/12/13 09:36:27 Executing pipeline with the direct runner.
2023/12/13 09:36:27 Pipeline:
2023/12/13 09:36:27 Nodes: {1: []uint8/bytes GLO:unbounded}
{2: []uint8/bytes FIX[10s]:unbounded}
{3: string/string FIX[10s]:unbounded}
{4: string/string FIX[10s]:unbounded}
{5: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
{6: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
Edges: 1: External [] -> [Out: []uint8 -> {1: []uint8/bytes GLO:unbounded}]
2: WindowInto [In(Main): []uint8 <- {1: []uint8/bytes GLO:unbounded}] ->
[Out: []uint8 -> {2: []uint8/bytes FIX[10s]:unbounded}]
3: ParDo [In(Main): []uint8 <- {2: []uint8/bytes FIX[10s]:unbounded}] ->
[Out: string -> {3: string/string FIX[10s]:unbounded}]
4: ParDo [In(Main): string <- {3: string/string FIX[10s]:unbounded}] ->
[Out: string -> {4: string/string FIX[10s]:unbounded}]
5: ParDo [In(Main): string <- {4: string/string FIX[10s]:unbounded}] ->
[Out: KV<string,int> -> {5: KV<string,int>/KV<string,int[varintz]>
FIX[10s]:unbounded}]
6: ParDo [In(Main): KV<X,Y> <- {5: KV<string,int>/KV<string,int[varintz]>
FIX[10s]:unbounded}] -> [Out: KV<X,Y> -> {6:
KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}]
2023/12/13 09:36:27 Failed to execute job: translation failed
caused by:
no root units
exit status 1
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [X] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]