gowtham3 opened a new issue, #23723:
URL: https://github.com/apache/beam/issues/23723
### What happened?
I'm trying to use apache beam go sdk to listen to messages from kafka. This
is how I'm trying to use kafkaio library to connect to kafka.
```
func main() {
// In order to start creating the pipeline for execution, a Pipeline
object is needed.
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
ctx := context.Background()
log.Info(ctx, "Starting ")
//_ = textio.Read(s, "example_input.txt")
messages := kafkaio.Read(s,
"localhost:8097",
"localhost:9092",
[]string{"quickstart-events"},
)
vals := beam.DropKey(s, messages)
beam.ParDo0(s, &LogFn{}, vals)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err.Error())
}
}
```
I followed the instructions given in
https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go
example to write this. I have kafka setup and running locally. As instructed, I
also have the expansion service running too. But, when I try to run this, I'm
getting `no root units found` error with the following logs
```
2022/10/19 16:02:21 Starting
2022/10/19 16:02:21 Executing pipeline with the direct runner.
2022/10/19 16:02:21 Pipeline:
2022/10/19 16:02:21 Nodes: {1: KV<[]uint8,[]uint8>/KV<bytes,bytes>
GLO:unbounded}
{2: []uint8/bytes GLO:unbounded}
Edges: 1: External [] -> [Out: KV<[]uint8,[]uint8> -> {1:
KV<[]uint8,[]uint8>/KV<bytes,bytes> GLO:unbounded}]
2: ParDo [In(Main): KV<X,Y> <- {1: KV<[]uint8,[]uint8>/KV<bytes,bytes>
GLO:unbounded}] -> [Out: Y -> {2: []uint8/bytes GLO:unbounded}]
3: ParDo [In(Main): []uint8 <- {2: []uint8/bytes GLO:unbounded}] -> []
2022/10/19 16:02:21 Failed to execute job: translation failed
caused by:
no root units
panic: Failed to execute job: translation failed
caused by:
no root units
```
Upon debugging, I found the logic where we're determining the root expects
an edge with OpType `Impulse` and since the above pipeline graph doesn't
contain any root nodes, the job failed. I was able to resolve this error by
just adding a dummy textio.Read to the pipeline (commented line in the code
shared above). But, then the kafka consumer was not listening to the messages.
The program ended without any kafka listener listening to the topic. I'm
guessing this is happening since it won't be part of the execution plan since
there was no root detected in that part of the pipeline. Am I missing anything?
Are there any more examples of go sdk using kafka connect?
Note: I was able to connect and listen to kafka messages using the java
client. So, I suspect it has some issues related to cross language
communication based on which the kafkaio is implemented.
### Issue Priority
Priority: 2
### Issue Component
Component: sdk-go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]