I have spent a lot of time trying to figure out the following problem. I need to consume messages from the topic of remote Kafka queue using Scala and Spark. By default the port of Kafka on remote machine is set to `7072`, not `9092`. Also, on remote machine there are the following versions installed:
1. Kafka 0.10.1.0 2. Scala 2.11 It means that I should pass the broker list (with the port `7072`) from Scala to remote Kafka, because otherwise it will try to use the default port. The problem is that according to logs the parameter `bootstrap.servers` cannot be recognized by the remote machine. I also tried to rename this parameter to `metadata.broker.list`, `broker.list` and `listeners`, but all the time the same error appears in logs, e.g. `Property bootstrap.servers is not valid` and then the port `9092` is used by default (and the messages are obviously not consumed). Using telnet, I check that I have an access to remote Kafka from the EMR machine on Amazon Cloud. I also checked that the name of Kafka topic is correct (I can consume messages from terminal using `curl` and Rest API, but just everything fails in Scala). In POM file I use the following dependency for Kafka and Spark: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.2</version> </dependency> I use Scala 2.10, not 2.11. This is my Scala code for Kafka consumer (*it works absolutely fine if I use my own Kafka installed in Amazon Cloud where I have EMR machines (there I have the port `9092` used for Kafka)*): val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap val kafkaParams = Map[String, String]( "broker.list" -> "XXX.XX.XXX.XX:7072", "zookeeper.connect" -> "XXX.XX.XXX.XX:2181", "group.id" -> "test", "zookeeper.connection.timeout.ms" -> "10000", "auto.offset.reset" -> "smallest") val testEvents: DStream[String] = KafkaUtils .createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, testTopicMap, StorageLevel.MEMORY_AND_DISK_SER_2 ).map(_._2) I was reading this Documentation (/https://kafka.apache.org/documentation/#brokerconfigs/) but it looks like everything I did is correct. Should I use some other Kafka client API (other Maven dependency)? *UPDATE #1:* I also tried Direct Stream (without Zookeeper), but it runs into the error: val testTopicMap = testTopic.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", "bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", "auto.offset.reset" -> "smallest") val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2) testEvents.print() 17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. *UPDATE #2:* Some of suggested solutions was to set `the property 'advertised.host.name' as instructed by the comments in the kafka configuration (config/server.properties)`. Do I understand correctly that `config/server.properties` should be changed on the remote machine where Kafka is installed? ANY SUGGESTION WILL BE REALLY HIGHLY APPRECIATED. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Cannot-pass-broker-list-parameter-from-Scala-to-Kafka-Property-bootstrap-servers-is-not-valid-tp20424.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org