gowtham3 opened a new issue, #23723:
URL: https://github.com/apache/beam/issues/23723

   ### What happened?
   
   I'm trying to use apache beam go sdk to listen to messages from kafka. This 
is how I'm trying to use kafkaio library to connect to kafka.
   
   ```
   
   func main() {
        // In order to start creating the pipeline for execution, a Pipeline 
object is needed.
        flag.Parse()
        beam.Init()
        p := beam.NewPipeline()
        s := p.Root()
        ctx := context.Background()
   
        log.Info(ctx, "Starting ")
        //_ = textio.Read(s, "example_input.txt")
   
        messages := kafkaio.Read(s,
                "localhost:8097",
                "localhost:9092",
                []string{"quickstart-events"},
        )
   
        vals := beam.DropKey(s, messages)
        beam.ParDo0(s, &LogFn{}, vals)
        if err := beamx.Run(ctx, p); err != nil {
                log.Fatalf(ctx, "Failed to execute job: %v", err.Error())
        }
   }
   ```
   
   I followed the instructions given in 
https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go 
example to write this. I have kafka setup and running locally. As instructed, I 
also have the expansion service running too. But, when I try to run this, I'm 
getting `no root units found` error with the following logs
   
   ```
   2022/10/19 16:02:21 Starting
   2022/10/19 16:02:21 Executing pipeline with the direct runner.
   2022/10/19 16:02:21 Pipeline:
   2022/10/19 16:02:21 Nodes: {1: KV<[]uint8,[]uint8>/KV<bytes,bytes> 
GLO:unbounded}
   {2: []uint8/bytes GLO:unbounded}
   Edges: 1: External [] -> [Out: KV<[]uint8,[]uint8> -> {1: 
KV<[]uint8,[]uint8>/KV<bytes,bytes> GLO:unbounded}]
   2: ParDo [In(Main): KV<X,Y> <- {1: KV<[]uint8,[]uint8>/KV<bytes,bytes> 
GLO:unbounded}] -> [Out: Y -> {2: []uint8/bytes GLO:unbounded}]
   3: ParDo [In(Main): []uint8 <- {2: []uint8/bytes GLO:unbounded}] -> []
   2022/10/19 16:02:21 Failed to execute job: translation failed
           caused by:
   no root units
   panic: Failed to execute job: translation failed
           caused by:
   no root units
   ```
   
   Upon debugging, I found the logic where we're determining the root expects 
an edge with OpType `Impulse` and since the above pipeline graph doesn't 
contain any root nodes, the job failed. I was able to resolve this error by 
just adding a dummy textio.Read to the pipeline (commented line in the code 
shared above). But, then the kafka consumer was not listening to the messages. 
The program ended without any kafka listener listening to the topic. I'm 
guessing this is happening since it won't be part of the execution plan since 
there was no root detected in that part of the pipeline. Am I missing anything? 
Are there any more examples of go sdk using kafka connect?
   
   Note: I was able to connect and listen to kafka messages using the java 
client. So, I suspect it has some issues related to cross language 
communication based on which the kafkaio is implemented.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to