Hi,

I am reading spark streaming Kafka code.

In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to
create classTag.
However, they are all implicit. I don't understand why they are implicit.

In fact, I can not find any other overloaded "createDirectStream" take
implicit parameters.

So what are these implicit ClassTags are used for ? Thank you.

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
    jssc: JavaStreamingContext,
    keyClass: Class[K],
    valueClass: Class[V],
    keyDecoderClass: Class[KD],
    valueDecoderClass: Class[VD],
    recordClass: Class[R],
    kafkaParams: JMap[String, String],
    fromOffsets: JMap[TopicAndPartition, JLong],
    messageHandler: JFunction[MessageAndMetadata[K, V], R]
  ): JavaInputDStream[R] = {
  implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
  implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
  implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
  implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
  implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
  val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
  createDirectStream[K, V, KD, VD, R](
    jssc.ssc,
    Map(kafkaParams.toSeq: _*),
    Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
    cleanedHandler
  )
}


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply via email to