Hi All,
Recently I started migrating the code from kafka08 to kafka010.
in 08 *topics * argument takes care of consuming number of partitions for
each topic.
def createStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]
How to pass this configuration w.r.t kafka010 ?
sample code w.r.t kafka010,I find no way or the API to set this paramater
val kafkaParams = Map[String, Object]("group.id" -> groupId,
"bootstrap.servers" -> bootstrapServer,
"value.deserializer" -> classOf[StringDeserializer],
"key.deserializer" -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicArr.toSet,
kafkaParams))
Regards
Sandeep Katta
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]