Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Liana Napalkova
Thanks, Timur.

The problem is that if I run `foreachPartitions`, then I cannot ` start` the 
streaming query. Or perhaps I miss something.



From: Timur Shenkao <t...@timshenkao.su>
Sent: 18 December 2017 16:11:06
To: Liana Napalkova
Cc: Silvio Fiorito; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2

Spark Dataset / Dataframe has foreachPartition() as well. Its implementation is 
much more efficient than RDD's.
There is ton of code snippets, say 
https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova 
<liana.napalk...@eurecat.org<mailto:liana.napalk...@eurecat.org>> wrote:

I need to firstly read from Kafka queue into a DataFrame. Then I should perform 
some transformations with the data. Finally, for each row in the DataFrame I 
should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


Why don’t you just use the Kafka sink for Spark 2.2?



https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries







From: Liana Napalkova 
<liana.napalk...@eurecat.org<mailto:liana.napalk...@eurecat.org>>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: How to properly execute `foreachPartition` in Spark 2.2



Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}





Liana Napalkova, PhD

Big Data Analytics Unit







[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]






T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org<mailto:liana.napalk...@eurecat.org>





Carrer Camí Antic de València 
54<https://maps.google.com/?q=Cam%C3%AD+Antic+de+Val%C3%A8ncia+54=gmail=g>-56,
 Edifici A - 08005 - Barcelona
www.eurecat.org<http://www.eurecat.org>





Ascamm, BDigital, Barcelona Media i Cetemmsa ara so

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Cody Koeninger
You can't create a network connection to kafka on the driver and then
serialize it to send it the executor.  That's likely why you're getting
serialization errors.

Kafka producers are thread safe and designed for use as a singleton.

Use a lazy singleton instance of the producer on the executor, don't pass
it in.

On Mon, Dec 18, 2017 at 9:20 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Couldn’t you readStream from Kafka, do your transformations, map your rows
> from the transformed input into what you want need to send to Kafka, then
> writeStream to Kafka?
>
>
>
>
>
> *From: *Liana Napalkova <liana.napalk...@eurecat.org>
> *Date: *Monday, December 18, 2017 at 10:07 AM
> *To: *Silvio Fiorito <silvio.fior...@granturing.com>, "
> user@spark.apache.org" <user@spark.apache.org>
>
> *Subject: *Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> I need to firstly read from Kafka queue into a DataFrame. Then I should
> perform some transformations with the data. Finally, for each row in the
> DataFrame I should conditionally apply KafkaProducer in order to send some
> data to Kafka.
>
> So, I am both consuming and producing the data from/to Kafka.
>
>
> --
>
> *From:* Silvio Fiorito <silvio.fior...@granturing.com>
> *Sent:* 18 December 2017 16:00:39
> *To:* Liana Napalkova; user@spark.apache.org
> *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Why don’t you just use the Kafka sink for Spark 2.2?
>
>
>
> https://spark.apache.org/docs/2.2.0/structured-streaming-
> kafka-integration.html#creating-a-kafka-sink-for-streaming-queries
>
>
>
>
>
>
>
> *From: *Liana Napalkova <liana.napalk...@eurecat.org>
> *Date: *Monday, December 18, 2017 at 9:45 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Hi,
>
>
>
> I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I
> explain the problem is details. I appreciate any help.
>
>
>
> In Spark 1.6 I was doing something similar to this:
>
>
>
> DstreamFromKafka.foreachRDD(session => {
> session.foreachPartition { partitionOfRecords =>
>   println("Setting the producer.")
>   val producer = Utils.createProducer(mySet.
> value("metadataBrokerList"),
>
> mySet.value("batchSize"),
>
> mySet.value("lingerMS"))
>   partitionOfRecords.foreach(s => {
>
>  //...
>
>
>
> However, I cannot find the proper way to do the similar thing in Spark
> 2.2. I tried to write my own class by extending `ForeachWriter`, but I get
> Task Serialization error when passing `KafkaProducer`.
>
> *class *MyTestClass(
> // *val inputparams*: String)
>   *extends *Serializable
> {
>
>   *val **spark *= SparkSession
> .*builder*()
> .appName("TEST")
> //.config("spark.sql.warehouse.dir", kafkaData)
> .enableHiveSupport()
> .getOrCreate()
>
> *import **spark*.implicits._
>
> *val *df: Dataset[String] = *spark*.readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test")
>  .option("startingOffsets", "latest")
>  .option("failOnDataLoss", "true")
>  .load()
>  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
> String)] // Kafka sends bytes
>  .map(_._2)
>
> *val *producer = // create KafkaProducer
>
> *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String])
>
> *val *query = df
>   .writeStream
>   .foreach(writer)
>   .start
>
> query.awaitTermination()
>
> *spark*.stop()
>
>
> *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable {
>
>   *var **producer*: KafkaProducer[String,String] = _
>
>   *def this*(producer: KafkaProducer[String,String])
>   {
> *this*()
> *this*.*producer *= producer
>   }
>
>   *override def *process(row: String): Unit =
>   {
> // ...
>   }
>
>   *override def *close(errorOrNull: Throwable): Unit = {}
>
>   *override def *open(partitionId: Long, version: Long): Boolean = {
>
> *true  *}
>
> }
>
>
>
>
>
> *Liana Napalkova, PhD*
>
> *Big Data Analytics Unit*
> * 

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Liana Napalkova
If there is no other way, then I will follow this recommendation.



From: Silvio Fiorito <silvio.fior...@granturing.com>
Sent: 18 December 2017 16:20:03
To: Liana Napalkova; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


Couldn’t you readStream from Kafka, do your transformations, map your rows from 
the transformed input into what you want need to send to Kafka, then 
writeStream to Kafka?





From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <silvio.fior...@granturing.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2



I need to firstly read from Kafka queue into a DataFrame. Then I should perform 
some transformations with the data. Finally, for each row in the DataFrame I 
should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.





From: Silvio Fiorito <silvio.fior...@granturing.com>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2



Why don’t you just use the Kafka sink for Spark 2.2?



https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries







From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to properly execute `foreachPartition` in Spark 2.2



Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}





Liana Napalkova, PhD

Big Data Analytics Unit







[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]






T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org





Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org





Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat





DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.


Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Couldn’t you readStream from Kafka, do your transformations, map your rows from 
the transformed input into what you want need to send to Kafka, then 
writeStream to Kafka?


From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <silvio.fior...@granturing.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


I need to firstly read from Kafka queue into a DataFrame. Then I should perform 
some transformations with the data. Finally, for each row in the DataFrame I 
should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito <silvio.fior...@granturing.com>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


Why don’t you just use the Kafka sink for Spark 2.2?



https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries







From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to properly execute `foreachPartition` in Spark 2.2



Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}





Liana Napalkova, PhD

Big Data Analytics Unit







[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]






T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org





Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org




Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat





DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Timur Shenkao
Spark Dataset / Dataframe has foreachPartition() as well. Its
implementation is much more efficient than RDD's.
There is ton of code snippets, say
https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova <
liana.napalk...@eurecat.org> wrote:

> I need to firstly read from Kafka queue into a DataFrame. Then I should
> perform some transformations with the data. Finally, for each row in the
> DataFrame I should conditionally apply KafkaProducer in order to send some
> data to Kafka.
>
> So, I am both consuming and producing the data from/to Kafka.
>
>
>
> --
> *From:* Silvio Fiorito <silvio.fior...@granturing.com>
> *Sent:* 18 December 2017 16:00:39
> *To:* Liana Napalkova; user@spark.apache.org
> *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
> Why don’t you just use the Kafka sink for Spark 2.2?
>
>
>
> https://spark.apache.org/docs/2.2.0/structured-streaming-
> kafka-integration.html#creating-a-kafka-sink-for-streaming-queries
>
>
>
>
>
>
>
> *From: *Liana Napalkova <liana.napalk...@eurecat.org>
> *Date: *Monday, December 18, 2017 at 9:45 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Hi,
>
>
>
> I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I
> explain the problem is details. I appreciate any help.
>
>
>
> In Spark 1.6 I was doing something similar to this:
>
>
>
> DstreamFromKafka.foreachRDD(session => {
> session.foreachPartition { partitionOfRecords =>
>   println("Setting the producer.")
>   val producer = Utils.createProducer(mySet.
> value("metadataBrokerList"),
>
> mySet.value("batchSize"),
>
> mySet.value("lingerMS"))
>   partitionOfRecords.foreach(s => {
>
>  //...
>
>
>
> However, I cannot find the proper way to do the similar thing in Spark
> 2.2. I tried to write my own class by extending `ForeachWriter`, but I get
> Task Serialization error when passing `KafkaProducer`.
>
> *class *MyTestClass(
> // *val inputparams*: String)
>   *extends *Serializable
> {
>
>   *val **spark *= SparkSession
> .*builder*()
> .appName("TEST")
> //.config("spark.sql.warehouse.dir", kafkaData)
> .enableHiveSupport()
> .getOrCreate()
>
> *import **spark*.implicits._
>
> *val *df: Dataset[String] = *spark*.readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test")
>  .option("startingOffsets", "latest")
>  .option("failOnDataLoss", "true")
>  .load()
>  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
> String)] // Kafka sends bytes
>  .map(_._2)
>
> *val *producer = // create KafkaProducer
>
> *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String])
>
> *val *query = df
>   .writeStream
>   .foreach(writer)
>   .start
>
> query.awaitTermination()
>
> *spark*.stop()
>
>
> *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable {
>
>   *var **producer*: KafkaProducer[String,String] = _
>
>   *def this*(producer: KafkaProducer[String,String])
>   {
> *this*()
> *this*.*producer *= producer
>   }
>
>   *override def *process(row: String): Unit =
>   {
> // ...
>   }
>
>   *override def *close(errorOrNull: Throwable): Unit = {}
>
>   *override def *open(partitionId: Long, version: Long): Boolean = {
>
> *true  *}
>
> }
>
>
>
>
>
> *Liana Napalkova, PhD*
>
> *Big Data Analytics Unit*
> * -- *
>
>
>
>
>
> *T  +34 **93 238 14 00 (ext. 1248)*
> *M +34 **633 426 677*
>
> *liana.napalk...@eurecat.org <liana.napalk...@eurecat.org>*
> --
>
> Carrer Camí Antic de València 54
> <https://maps.google.com/?q=Cam%C3%AD+Antic+de+Val%C3%A8ncia+54=gmail=g>-56,
> Edifici A - 08005 - Barcelona
> www.eurecat.org
>
> Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat
>
>
> --
> DISCLAIMER: Aquest m

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Liana Napalkova
I need to firstly read from Kafka queue into a DataFrame. Then I should perform 
some transformations with the data. Finally, for each row in the DataFrame I 
should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.




From: Silvio Fiorito <silvio.fior...@granturing.com>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


Why don’t you just use the Kafka sink for Spark 2.2?



https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries







From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to properly execute `foreachPartition` in Spark 2.2



Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}





Liana Napalkova, PhD

Big Data Analytics Unit







[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]






T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org





Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org





Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat





DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y la grabación de los mensajes, rogamos lo ponga en nuestro 
conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this 
message. If you are not the addressee indicated in this message you should 
destroy this message, and notify us immediately to the following address: 
le...@eurecat.org. If the addressee of this message does not consent to the use 
of Internet e-mail and message recording, please notify us immediate

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Why don’t you just use the Kafka sink for Spark 2.2?

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries



From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to properly execute `foreachPartition` in Spark 2.2


Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}




Liana Napalkova, PhD

Big Data Analytics Unit




[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]




T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org




Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org



Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat



DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y la grabación de los mensajes, rogamos lo ponga en nuestro 
conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this 
message. If you are not the addressee indicated in this message you should 
destroy this message, and notify us immediately to the following address: 
le...@eurecat.org. If the addressee of this message does not consent to the use 
of Internet e-mail and message recording, please notify us immediately.





How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Liana Napalkova
Hi,


I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.


In Spark 1.6 I was doing something similar to this:


DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...


However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}



Liana Napalkova, PhD

Big Data Analytics Unit



[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]



T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org




Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org


[http://cdn.eurecat.org/imgs/degradat.jpg]
Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat



DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y la grabación de los mensajes, rogamos lo ponga en nuestro 
conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this 
message. If you are not the addressee indicated in this message you should 
destroy this message, and notify us immediately to the following address: 
le...@eurecat.org. If the addressee of this message does not consent to the use 
of Internet e-mail and message recording, please notify us immediately.