Thanks Michael, that worked, appreciate your help.

From: Michael Armbrust []
Sent: Monday, May 15, 2017 11:45 AM
To: Revin Chalil <>
Cc: User <>
Subject: Re: Spark SQL DataFrame to Kafka Topic

The foreach sink from that blog post requires that you have a DataFrame with 
two columns in the form of a Tuple2, (String, String), where as your dataframe 
has only a single column `payload`.  You could change the KafkaSink to extend 
ForeachWriter[KafkaMessage] and then it would work.

I'd also suggest you just try the native 
 that is part of Spark 

On Sun, May 14, 2017 at 9:31 AM, Revin Chalil 
<<>> wrote:
Hi TD / Michael,

I am trying to use the foreach sink to write to Kafka and followed 
 from DBricks blog by Sunil 
Sitaula<> . I get the below 
with DF.writeStream.foreach(writer).outputMode("update").start() when using a 
simple DF

Type mismatch, expected: foreachWriter[Row], actual: KafkaSink

Cannot resolve reference foreach with such signature

Below is the snippet

val data = session
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .flatMap(d => {
    var events = AvroHelper.readEvents(d) HdfsEvent) => {
      var payload =
      new KafkaMessage(payload)

case class KafkaMessage(
  payload: String)

This is where I use the foreach

val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()

In this case, it shows –

Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink

Cannot resolve reference foreach with such signature

Any help is much appreciated. Thank you.

From: Tathagata Das 
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers <<>>
Cc: Peyman Mohajerian <<>>; Senthil 
Kumar <<>>; User 
Subject: Re: Spark SQL DataFrame to Kafka Topic

Structured Streaming has a foreach sink, where you can essentially do what you 
want with your data. Its easy to create a Kafka producer, and write the data 
out to kafka.

On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers 
<<>> wrote:
how do you do this with structured streaming? i see no mention of writing to 

On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian 
<<>> wrote:
Yes, it is called Structured Streaming:

On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar 
<<>> wrote:
Hi Team ,

     Sorry if this question already asked in this forum..

Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??

Here is my Code which Reads Parquet File :

val sqlContext = new org.apache.spark.sql.SQLContext(sc);

val df ="..../temp/*.parquet")


I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve 
this ??



Reply via email to