Long term arbitrary stateful processing - best practices

2018-08-29 Thread monohusche
Hi there,

We are currently evaluating Spark to provide streaming analytics for
unbounded data sets. Basically creating aggregations over event time windows
while supporting both early and late/out of order messages. For streaming
aggregations (=SQL operations with well-defined semantics, e.g. count, avg),
I think, it is all pretty clear. 

My question is more regarding arbitrary stateful processing, especially
mapGroupsWithState.

The scenario is as follows: We have continuously incoming purchase order
messages (via Kafka), every message referencing a user. Firstly, the status
of the referenced user needs to be updated (=active). If no purchase within
a year, he/she is deemed inactive (i.e. the timeout case). 

On top of that, every month, we need to calculate the aggregate on top of
these user states, i.e. the percentage of active users. This deck
(https://www.slideshare.net/databricks/a-deep-dive-into-stateful-stream-processing-in-structured-streaming-with-tathagata-das)
describes a similar scenario from page 45.

The issue in our case is how to manage such arbitrary state at scale. On one
hand, we have a large number of users, at the same time, the window (to keep
the intermediate state) is very large (=a whole year). 

Are there alternative patterns for implementing such a requirement?

By the way, I am quite new to Spark Structured Streaming, so primarily
looking for pointers to avoid barking up the wrong tree.

thanks in advance, Nick


 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is there a plan for official spark-avro/spark-orc read/write library using Data Source V2

2018-08-29 Thread yxchen
Data Source V2 API is available in Spark 2.3. But currently there's no
official library using data source v2 api to read/write avro or orc files --
spark-avro and spark-orc are both using Data Source V1. I wonder if there's
a plan in the upstream to implement those readers/writers and make them
official libraries? Or maybe a Spark-Hive connector to read Avro/Orc tables
using Data Source V2?

Although it's doable to implement such readers/writers on our own, but
depending on the official library would make it easier to maintain the
project.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I can't... do you think that it's a possible bug of this version?? from
Spark or Kafka?

El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
escribió:

> Are you able to try a recent version of spark?
>
> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>  wrote:
> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> > exception and Spark dies.
> >
> > I couldn't see any error or problem among the machines, anybody has the
> > reason about this error?
> >
> >
> > java.lang.IllegalStateException: This consumer has already been closed.
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.Option.orElse(Option.scala:289)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > ~[scala-library-2.11.11.jar:na]
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > ~[scala-library-2.11.11.jar:na]
> > at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.Try$.apply(Try.scala:192)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> > org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> > 

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Cody Koeninger
Are you able to try a recent version of spark?

On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
 wrote:
> I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> exception and Spark dies.
>
> I couldn't see any error or problem among the machines, anybody has the
> reason about this error?
>
>
> java.lang.IllegalStateException: This consumer has already been closed.
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Re: Spark code to write to MySQL and Hive

2018-08-29 Thread Jacek Laskowski
Hi,

I haven't checked my answer (too lazy today), but think I know what might
be going on.

tl;dr Use cache to preserve the initial set of rows from mysql

After you append new rows, you will have twice as many rows as you had
previously. Correct?

Since newDF references the table every time you use it in a structured
query, say to write it to a table, the source table will get re-loaded and
hence the number of rows changes.

What you should do is to execute newDF.cache.count right after val newDF =
mysqlDF.select... so the data (rows) remains on executors and won't get
reloaded.

Hope that helps.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 29, 2018 at 4:59 PM  wrote:

> Sorry, last mail format was not good.
>
>
>
> *println*(*"Going to talk to mySql"*)
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
> mysqlDF.show()
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
> *// Insert records into the table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
>
>
> Going to talk to mySql
>
> I am back from mySql
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> |  1|   USA|Palo Alto|
>
> |  2|Czech Republic| Brno|
>
> |  3|   USA|Sunnyvale|
>
> |  4|  null| null|
>
> +---+--+-+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> | 11|   USA|Palo Alto|
>
> | 12|Czech Republic| Brno|
>
> | 13|   USA|Sunnyvale|
>
> | 14|  null| null|
>
> +---+--+-+
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> | 11|   USA|Palo Alto|
>
> | 12|Czech Republic| Brno|
>
> | 13|   USA|Sunnyvale|
>
> | 14|  null| null|
>
> | 24|  null| null|
>
> | 23|   USA|Sunnyvale|
>
> | 22|Czech Republic| Brno|
>
> | 21|   USA|Palo Alto|
>
> +---+--+-+
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* ryanda...@gmail.com 
> *Sent:* Wednesday, August 29, 2018 8:19 PM
> *To:* user@spark.apache.org
> *Subject:* Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> Can anyone help me to understand what is happening with my code ?
>
>
>
> I wrote a Spark application to read from a MySQL table [that already has 4
> records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
> write the new DF to MySQL as well as to Hive.
>
>
>
> I am surprised to see additional set of records in Hive !! I am not able
> to understand how the *newDF *has records with IDs 21 to 24.  I know that
> a DF is immutable. If so, how come it has 4 records at one point and 8
> records at later point ?
>
>
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
>
>
>
>
>
>
>
>
> mysqlDF.show()
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
>
>
>
>
> *// Insert records into the MySQL table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
> *Records already existing in mySql*
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> |  1|   USA|Palo Alto|
>
> |  2|Czech Republic| Brno|
>
> |  3|   USA|Sunnyvale|
>
> |  4|  null| null|
>
> +---+--+-+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> *newDF.show()*
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> | 11|   USA|Palo Alto|
>
> | 12|Czech Republic| Brno|
>
> | 13|   USA|Sunnyvale|
>
> | 14|  null| null|
>
> 

RE: Spark code to write to MySQL and Hive

2018-08-29 Thread ryandam.9
Sorry, last mail format was not good. 

 


println("Going to talk to mySql")

// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")

mysqlDF.show()

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()

// Insert records into the table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

 

Going to talk to mySql

I am back from mySql

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

Thanks,

Ravi

 

From: ryanda...@gmail.com  
Sent: Wednesday, August 29, 2018 8:19 PM
To: user@spark.apache.org
Subject: Spark code to write to MySQL and Hive

 

Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")



 

 

 

 

mysqlDF.show()



 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()



 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)



 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

 

Thanks for you time. 

Ravi



Spark code to write to MySQL and Hive

2018-08-29 Thread ryandam.9
Hi,

 

Can anyone help me to understand what is happening with my code ?

 

I wrote a Spark application to read from a MySQL table [that already has 4
records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
write the new DF to MySQL as well as to Hive. 

 

I am surprised to see additional set of records in Hive !! I am not able to
understand how the newDF has records with IDs 21 to 24.  I know that a DF is
immutable. If so, how come it has 4 records at one point and 8 records at
later point ?

 


// Read table from mySQL.
val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties)
println("I am back from mySql")




 

 

 

 

mysqlDF.show()




 

 

 

 

 

// Create a new Dataframe with column 'id' increased to avoid Duplicate
primary keys
val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"),
col("city"))
newDF.printSchema()
newDF.show()




 

 

// Insert records into the MySQL table.
newDF.write
  .mode(SaveMode.Append)
  .jdbc(jdbcUrl, table, properties)




 

// Write to Hive - This Creates a new table.
newDF.write.saveAsTable("cities")
newDF.show()

 

 

Records already existing in mySql

 

+---+--+-+

| id|   country| city|

+---+--+-+

|  1|   USA|Palo Alto|

|  2|Czech Republic| Brno|

|  3|   USA|Sunnyvale|

|  4|  null| null|

+---+--+-+

 

root

|-- id: long (nullable = false)

|-- country: string (nullable = true)

|-- city: string (nullable = true)

 

newDF.show()

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

+---+--+-+

 

+---+--+-+

| id|   country| city|

+---+--+-+

| 11|   USA|Palo Alto|

| 12|Czech Republic| Brno|

| 13|   USA|Sunnyvale|

| 14|  null| null|

| 24|  null| null|

| 23|   USA|Sunnyvale|

| 22|Czech Republic| Brno|

| 21|   USA|Palo Alto|

+---+--+-+

 

 

Thanks for you time. 

Ravi



Re: Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread Jeevan K. Srivatsa
Dear Apostolos,

Thanks for the response!

Our version is built on 2.1, the problem is that the state-of-the-art
system I'm trying to compare is built on the version 1.2. So I have to deal
with it.

If I understand the level of parallelism correctly, --total-executor-cores
is set to the number or workers multiplied by the executor core of each
worker, in this case, 32 as well. I make use of the similar script in both
the cases, so it shouldn't change.

Thanks and regards,
Jeevan K. Srivatsa


On Wed, 29 Aug 2018 at 16:07, Apostolos N. Papadopoulos <
papad...@csd.auth.gr> wrote:

> Dear Jeevan,
>
> Spark 1.2 is quite old, and If I were you I would go for a newer version.
>
> However, is there a parallelism level (e.g., 20, 30) that works for both
> installations?
>
> regards,
>
> Apostolos
>
>
>
> On 29/08/2018 04:55 μμ, jeevan.ks wrote:
> > Hi,
> >
> > I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
> > benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
> > with the same data set from S3 bucket (size ranges from 50MB to 10 GB).
> The
> > Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
> > 28GB RAM. I observed a strange behaviour while running the benchmarks
> and is
> > as follows:
> >
> > - When I ran Spark 1.2 version with default partition number
> > (sc.defaultParallelism), the jobs would take forever to complete. So I
> > changed it to the number of cores, i.e., 32 times 3 = 96. This did a
> magic
> > and the jobs completed quickly.
> >
> > - However, when I tried the above magic number on the version 2.1, the
> jobs
> > are taking forever. Deafult parallelism works better, but not that
> > efficient.
> >
> > I'm having problem to rationalise this and compare both the systems. My
> > question is: what changes were made from 1.2 to 2.1 with respect to
> default
> > parallelism for this behaviour to occur? How can I have both versions
> behave
> > similary on the same software/hardware configuration so that I can
> compare?
> >
> > I'd really appreciate your help on this!
> >
> > Cheers,
> > Jeevan
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://delab.csd.auth.gr/~apostol
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread Apostolos N. Papadopoulos

Dear Jeevan,

Spark 1.2 is quite old, and If I were you I would go for a newer version.

However, is there a parallelism level (e.g., 20, 30) that works for both 
installations?


regards,

Apostolos



On 29/08/2018 04:55 μμ, jeevan.ks wrote:

Hi,

I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The
Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
28GB RAM. I observed a strange behaviour while running the benchmarks and is
as follows:

- When I ran Spark 1.2 version with default partition number
(sc.defaultParallelism), the jobs would take forever to complete. So I
changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic
and the jobs completed quickly.

- However, when I tried the above magic number on the version 2.1, the jobs
are taking forever. Deafult parallelism works better, but not that
efficient.

I'm having problem to rationalise this and compare both the systems. My
question is: what changes were made from 1.2 to 2.1 with respect to default
parallelism for this behaviour to occur? How can I have both versions behave
similary on the same software/hardware configuration so that I can compare?

I'd really appreciate your help on this!

Cheers,
Jeevan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://delab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread jeevan.ks
Hi,

I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The
Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
28GB RAM. I observed a strange behaviour while running the benchmarks and is
as follows:

- When I ran Spark 1.2 version with default partition number
(sc.defaultParallelism), the jobs would take forever to complete. So I
changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic
and the jobs completed quickly.

- However, when I tried the above magic number on the version 2.1, the jobs
are taking forever. Deafult parallelism works better, but not that
efficient.

I'm having problem to rationalise this and compare both the systems. My
question is: what changes were made from 1.2 to 2.1 with respect to default
parallelism for this behaviour to occur? How can I have both versions behave
similary on the same software/hardware configuration so that I can compare?

I'd really appreciate your help on this!

Cheers,
Jeevan 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Which Py4J version goes with Spark 2.3.1?

2018-08-29 Thread Gourav Sengupta
Hi,

I think that the best option is to use the py4j which is either
automatically installed with "pip install pyspark" or when we unzip the
Spark download from its site, its in SPARK_HOME/python/lib folder.


Regards,
Gourav Sengupta

On Wed, Aug 29, 2018 at 8:00 AM Aakash Basu 
wrote:

> Hi,
>
> Which Py4J version goes with Spark 2.3.1? I have py4j-0.10.7 but throws
> an error because of certain compatibility issues with the Spark 2.3.1.
>
> Error:
>
> [2018-08-29] [06:46:56] [ERROR] - Traceback (most recent call last): File
> "", line 120, in run File
> "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
> line 441, in csv return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
> File
> "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__ answer, self.gateway_client, self.target_id,
> self.name) File
> "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco return f(*a, **kw) File
> "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o49.csv.
>
> Any help?
>
> Thanks,
> Aakash.
>


Spark udf from external jar without enabling Hive

2018-08-29 Thread Swapnil Chougule
Hi Team,

I am creating udf as follow from external jar

val spark = SparkSession.builder.appName("UdfUser")
  .master("local")
  .enableHiveSupport()
  .getOrCreate()

spark.sql("CREATE FUNCTION uppercase AS 'path.package.udf.UpperCase' " +
  "USING JAR '/home/swapnil/udf-jars/spark-udf-1.0.jar'")


Can I do same without enabling hive support (i.e. memory catalog)? I tried
same but didn't get solution. Any help is really appreciated.

Thanks,
Swapnil


java.lang.OutOfMemoryError: Java heap space - Spark driver.

2018-08-29 Thread Guillermo Ortiz Fernández
I got this error from spark driver, it seems that I should increase the
memory in the driver although it's 5g (and 4 cores) right now. It seems
weird to me because I'm not using Kryo or broadcast in this process but in
the log there are references to Kryo and broadcast.
How could I figure out the reason of this outOfMemory? Is it normal that
there are references to Kryo and broadcasting when I'm not using it?

05:11:19.110 [streaming-job-executor-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange] because it collides with
previously registered codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange]
05:11:26.806 [dag-scheduler-event-loop] WARN  org.apache.spark.util.Utils -
Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
~[na:1.8.0_162]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_162]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
~[lz4-1.3.0.jar:na]
at
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
~[lz4-1.3.0.jar:na]
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
~[kryo-3.0.3.jar:na]
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
~[kryo-3.0.3.jar:na]
at
org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:209)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1012)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:936)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at scala.collection.immutable.List.foreach(List.scala:392)
[scala-library-2.11.11.jar:na]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
05:40:53.535 [dse-app-client-thread-pool-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' 

Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
exception and Spark dies.

I couldn't see any error or problem among the machines, anybody has the
reason about this error?


java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.11.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]


Which Py4J version goes with Spark 2.3.1?

2018-08-29 Thread Aakash Basu
Hi,

Which Py4J version goes with Spark 2.3.1? I have py4j-0.10.7 but throws an
error because of certain compatibility issues with the Spark 2.3.1.

Error:

[2018-08-29] [06:46:56] [ERROR] - Traceback (most recent call last): File
"", line 120, in run File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 441, in csv return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__ answer, self.gateway_client, self.target_id,
self.name) File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco return f(*a, **kw) File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o49.csv.

Any help?

Thanks,
Aakash.