Hi, I am reading spark streaming Kafka code.
In org.apache.spark.streaming.kafka.KafkaUtils file, the function "createDirectStream" takes key class, value class, etc to create classTag. However, they are all implicit. I don't understand why they are implicit. In fact, I can not find any other overloaded "createDirectStream" take implicit parameters. So what are these implicit ClassTags are used for ? Thank you. def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: JMap[String, String], fromOffsets: JMap[TopicAndPartition, JLong], messageHandler: JFunction[MessageAndMetadata[K, V], R] ): JavaInputDStream[R] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) createDirectStream[K, V, KD, VD, R]( jssc.ssc, Map(kafkaParams.toSeq: _*), Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), cleanedHandler ) } -- Hao Ren Data Engineer @ leboncoin Paris, France