Is Kafka itself running inside another container? If so inspect that container and see if it has a network alias and add that alias to your /etc/hosts file and map it to 127.0.0.1.
From: Chamikara Jayalath <[email protected]> Sent: Friday, June 5, 2020 2:58 PM To: Luke Cwik <[email protected]> Cc: user <[email protected]>; dev <[email protected]>; Heejong Lee <[email protected]> Subject: Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata Is it possible that "'localhost:9092'" is not available from the Docker environment where the Flink step is executed from ? Can you try specifying the actual IP address of the node running the Kafka broker ? On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <[email protected] <mailto:[email protected]> > wrote: +dev <mailto:[email protected]> +Chamikara Jayalath <mailto:[email protected]> +Heejong Lee <mailto:[email protected]> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <[email protected] <mailto:[email protected]> > wrote: I am unable to read from Kafka and getting the following warnings & errors when calling kafka.ReadFromKafka() (Python SDK): WARNING:root:severity: WARN timestamp { seconds: 1591370012 nanos: 523000000 } message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 could not be established. Broker may not be available." log_location: "org.apache.kafka.clients.NetworkClient" thread: "18" Finally the pipeline fails with: RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata See more complete log attached. The relevant code snippet: consumer_conf = {"bootstrap.servers": 'localhost:9092'} ... kafka.ReadFromKafka( consumer_config=consumer_conf, topics=[args.topic], ) ... Also see full python script attached. I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am also not able to read from topic. I am using kafka 2.5.0 and started the broker by following https://kafka.apache.org/quickstart - using default config/server.properties. Everything runs locally, and I verified that I can publish&consume from that topic using confluent_kafka library. -- Best regards, Piotr
