Long term arbitrary stateful processing - best practices
Hi there, We are currently evaluating Spark to provide streaming analytics for unbounded data sets. Basically creating aggregations over event time windows while supporting both early and late/out of order messages. For streaming aggregations (=SQL operations with well-defined semantics, e.g. count, avg), I think, it is all pretty clear. My question is more regarding arbitrary stateful processing, especially mapGroupsWithState. The scenario is as follows: We have continuously incoming purchase order messages (via Kafka), every message referencing a user. Firstly, the status of the referenced user needs to be updated (=active). If no purchase within a year, he/she is deemed inactive (i.e. the timeout case). On top of that, every month, we need to calculate the aggregate on top of these user states, i.e. the percentage of active users. This deck (https://www.slideshare.net/databricks/a-deep-dive-into-stateful-stream-processing-in-structured-streaming-with-tathagata-das) describes a similar scenario from page 45. The issue in our case is how to manage such arbitrary state at scale. On one hand, we have a large number of users, at the same time, the window (to keep the intermediate state) is very large (=a whole year). Are there alternative patterns for implementing such a requirement? By the way, I am quite new to Spark Structured Streaming, so primarily looking for pointers to avoid barking up the wrong tree. thanks in advance, Nick -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is there a plan for official spark-avro/spark-orc read/write library using Data Source V2
Data Source V2 API is available in Spark 2.3. But currently there's no official library using data source v2 api to read/write avro or orc files -- spark-avro and spark-orc are both using Data Source V1. I wonder if there's a plan in the upstream to implement those readers/writers and make them official libraries? Or maybe a Spark-Hive connector to read Avro/Orc tables using Data Source V2? Although it's doable to implement such readers/writers on our own, but depending on the official library would make it easier to maintain the project. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.
I can't... do you think that it's a possible bug of this version?? from Spark or Kafka? El mié., 29 ago. 2018 a las 22:28, Cody Koeninger () escribió: > Are you able to try a recent version of spark? > > On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández > wrote: > > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this > > exception and Spark dies. > > > > I couldn't see any error or problem among the machines, anybody has the > > reason about this error? > > > > > > java.lang.IllegalStateException: This consumer has already been closed. > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > > ~[kafka-clients-1.0.0.jar:na] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091) > > ~[kafka-clients-1.0.0.jar:na] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at scala.Option.orElse(Option.scala:289) > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > ~[scala-library-2.11.11.jar:na] > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > ~[scala-library-2.11.11.jar:na] > > at > > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at scala.util.Try$.apply(Try.scala:192) > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > >
Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.
Are you able to try a recent version of spark? On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández wrote: > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this > exception and Spark dies. > > I couldn't see any error or problem among the machines, anybody has the > reason about this error? > > > java.lang.IllegalStateException: This consumer has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > ~[kafka-clients-1.0.0.jar:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091) > ~[kafka-clients-1.0.0.jar:na] > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169) > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188) > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215) > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > ~[scala-library-2.11.11.jar:na] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na] > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > ~[scala-library-2.11.11.jar:na] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > ~[scala-library-2.11.11.jar:na] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.11.11.jar:na] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > ~[scala-library-2.11.11.jar:na] > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > ~[scala-library-2.11.11.jar:na] > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > ~[scala-library-2.11.11.jar:na] > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na] > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: Spark code to write to MySQL and Hive
Hi, I haven't checked my answer (too lazy today), but think I know what might be going on. tl;dr Use cache to preserve the initial set of rows from mysql After you append new rows, you will have twice as many rows as you had previously. Correct? Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes. What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded. Hope that helps. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Aug 29, 2018 at 4:59 PM wrote: > Sorry, last mail format was not good. > > > > *println*(*"Going to talk to mySql"*) > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > mysqlDF.show() > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > *// Insert records into the table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > > > > > Going to talk to mySql > > I am back from mySql > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 1| USA|Palo Alto| > > | 2|Czech Republic| Brno| > > | 3| USA|Sunnyvale| > > | 4| null| null| > > +---+--+-+ > > > > root > > |-- id: long (nullable = false) > > |-- country: string (nullable = true) > > |-- city: string (nullable = true) > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > +---+--+-+ > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > | 24| null| null| > > | 23| USA|Sunnyvale| > > | 22|Czech Republic| Brno| > > | 21| USA|Palo Alto| > > +---+--+-+ > > > > Thanks, > > Ravi > > > > *From:* ryanda...@gmail.com > *Sent:* Wednesday, August 29, 2018 8:19 PM > *To:* user@spark.apache.org > *Subject:* Spark code to write to MySQL and Hive > > > > Hi, > > > > Can anyone help me to understand what is happening with my code ? > > > > I wrote a Spark application to read from a MySQL table [that already has 4 > records], Create a new DF by adding 10 to the ID field. Then, I wanted to > write the new DF to MySQL as well as to Hive. > > > > I am surprised to see additional set of records in Hive !! I am not able > to understand how the *newDF *has records with IDs 21 to 24. I know that > a DF is immutable. If so, how come it has 4 records at one point and 8 > records at later point ? > > > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > > > > > > > > > mysqlDF.show() > > > > > > > > > > > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > > > > > *// Insert records into the MySQL table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > > > *Records already existing in mySql* > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 1| USA|Palo Alto| > > | 2|Czech Republic| Brno| > > | 3| USA|Sunnyvale| > > | 4| null| null| > > +---+--+-+ > > > > root > > |-- id: long (nullable = false) > > |-- country: string (nullable = true) > > |-- city: string (nullable = true) > > > > *newDF.show()* > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > >
RE: Spark code to write to MySQL and Hive
Sorry, last mail format was not good. println("Going to talk to mySql") // Read table from mySQL. val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties) println("I am back from mySql") mysqlDF.show() // Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city")) newDF.printSchema() newDF.show() // Insert records into the table. newDF.write .mode(SaveMode.Append) .jdbc(jdbcUrl, table, properties) // Write to Hive - This Creates a new table. newDF.write.saveAsTable("cities") newDF.show() Going to talk to mySql I am back from mySql +---+--+-+ | id| country| city| +---+--+-+ | 1| USA|Palo Alto| | 2|Czech Republic| Brno| | 3| USA|Sunnyvale| | 4| null| null| +---+--+-+ root |-- id: long (nullable = false) |-- country: string (nullable = true) |-- city: string (nullable = true) +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| +---+--+-+ +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| | 24| null| null| | 23| USA|Sunnyvale| | 22|Czech Republic| Brno| | 21| USA|Palo Alto| +---+--+-+ Thanks, Ravi From: ryanda...@gmail.com Sent: Wednesday, August 29, 2018 8:19 PM To: user@spark.apache.org Subject: Spark code to write to MySQL and Hive Hi, Can anyone help me to understand what is happening with my code ? I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field. Then, I wanted to write the new DF to MySQL as well as to Hive. I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24. I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ? // Read table from mySQL. val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties) println("I am back from mySql") mysqlDF.show() // Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city")) newDF.printSchema() newDF.show() // Insert records into the MySQL table. newDF.write .mode(SaveMode.Append) .jdbc(jdbcUrl, table, properties) // Write to Hive - This Creates a new table. newDF.write.saveAsTable("cities") newDF.show() Records already existing in mySql +---+--+-+ | id| country| city| +---+--+-+ | 1| USA|Palo Alto| | 2|Czech Republic| Brno| | 3| USA|Sunnyvale| | 4| null| null| +---+--+-+ root |-- id: long (nullable = false) |-- country: string (nullable = true) |-- city: string (nullable = true) newDF.show() +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| +---+--+-+ +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| | 24| null| null| | 23| USA|Sunnyvale| | 22|Czech Republic| Brno| | 21| USA|Palo Alto| +---+--+-+ Thanks for you time. Ravi
Spark code to write to MySQL and Hive
Hi, Can anyone help me to understand what is happening with my code ? I wrote a Spark application to read from a MySQL table [that already has 4 records], Create a new DF by adding 10 to the ID field. Then, I wanted to write the new DF to MySQL as well as to Hive. I am surprised to see additional set of records in Hive !! I am not able to understand how the newDF has records with IDs 21 to 24. I know that a DF is immutable. If so, how come it has 4 records at one point and 8 records at later point ? // Read table from mySQL. val mysqlDF = spark.read.jdbc(jdbcUrl, table, properties) println("I am back from mySql") mysqlDF.show() // Create a new Dataframe with column 'id' increased to avoid Duplicate primary keys val newDF = mysqlDF.select((col("id") + 10).as("id"), col("country"), col("city")) newDF.printSchema() newDF.show() // Insert records into the MySQL table. newDF.write .mode(SaveMode.Append) .jdbc(jdbcUrl, table, properties) // Write to Hive - This Creates a new table. newDF.write.saveAsTable("cities") newDF.show() Records already existing in mySql +---+--+-+ | id| country| city| +---+--+-+ | 1| USA|Palo Alto| | 2|Czech Republic| Brno| | 3| USA|Sunnyvale| | 4| null| null| +---+--+-+ root |-- id: long (nullable = false) |-- country: string (nullable = true) |-- city: string (nullable = true) newDF.show() +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| +---+--+-+ +---+--+-+ | id| country| city| +---+--+-+ | 11| USA|Palo Alto| | 12|Czech Republic| Brno| | 13| USA|Sunnyvale| | 14| null| null| | 24| null| null| | 23| USA|Sunnyvale| | 22|Czech Republic| Brno| | 21| USA|Palo Alto| +---+--+-+ Thanks for you time. Ravi
Re: Parallelism: behavioural difference in version 1.2 and 2.1!?
Dear Apostolos, Thanks for the response! Our version is built on 2.1, the problem is that the state-of-the-art system I'm trying to compare is built on the version 1.2. So I have to deal with it. If I understand the level of parallelism correctly, --total-executor-cores is set to the number or workers multiplied by the executor core of each worker, in this case, 32 as well. I make use of the similar script in both the cases, so it shouldn't change. Thanks and regards, Jeevan K. Srivatsa On Wed, 29 Aug 2018 at 16:07, Apostolos N. Papadopoulos < papad...@csd.auth.gr> wrote: > Dear Jeevan, > > Spark 1.2 is quite old, and If I were you I would go for a newer version. > > However, is there a parallelism level (e.g., 20, 30) that works for both > installations? > > regards, > > Apostolos > > > > On 29/08/2018 04:55 μμ, jeevan.ks wrote: > > Hi, > > > > I've two systems. One is built on Spark 1.2 and the other on 2.1. I am > > benchmarking both with the same benchmarks (wordcount, grep, sort, etc.) > > with the same data set from S3 bucket (size ranges from 50MB to 10 GB). > The > > Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and > > 28GB RAM. I observed a strange behaviour while running the benchmarks > and is > > as follows: > > > > - When I ran Spark 1.2 version with default partition number > > (sc.defaultParallelism), the jobs would take forever to complete. So I > > changed it to the number of cores, i.e., 32 times 3 = 96. This did a > magic > > and the jobs completed quickly. > > > > - However, when I tried the above magic number on the version 2.1, the > jobs > > are taking forever. Deafult parallelism works better, but not that > > efficient. > > > > I'm having problem to rationalise this and compare both the systems. My > > question is: what changes were made from 1.2 to 2.1 with respect to > default > > parallelism for this behaviour to occur? How can I have both versions > behave > > similary on the same software/hardware configuration so that I can > compare? > > > > I'd really appreciate your help on this! > > > > Cheers, > > Jeevan > > > > > > > > -- > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > -- > Apostolos N. Papadopoulos, Associate Professor > Department of Informatics > Aristotle University of Thessaloniki > Thessaloniki, GREECE > tel: ++0030312310991918 > email: papad...@csd.auth.gr > twitter: @papadopoulos_ap > web: http://delab.csd.auth.gr/~apostol > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Parallelism: behavioural difference in version 1.2 and 2.1!?
Dear Jeevan, Spark 1.2 is quite old, and If I were you I would go for a newer version. However, is there a parallelism level (e.g., 20, 30) that works for both installations? regards, Apostolos On 29/08/2018 04:55 μμ, jeevan.ks wrote: Hi, I've two systems. One is built on Spark 1.2 and the other on 2.1. I am benchmarking both with the same benchmarks (wordcount, grep, sort, etc.) with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and 28GB RAM. I observed a strange behaviour while running the benchmarks and is as follows: - When I ran Spark 1.2 version with default partition number (sc.defaultParallelism), the jobs would take forever to complete. So I changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic and the jobs completed quickly. - However, when I tried the above magic number on the version 2.1, the jobs are taking forever. Deafult parallelism works better, but not that efficient. I'm having problem to rationalise this and compare both the systems. My question is: what changes were made from 1.2 to 2.1 with respect to default parallelism for this behaviour to occur? How can I have both versions behave similary on the same software/hardware configuration so that I can compare? I'd really appreciate your help on this! Cheers, Jeevan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://delab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Parallelism: behavioural difference in version 1.2 and 2.1!?
Hi, I've two systems. One is built on Spark 1.2 and the other on 2.1. I am benchmarking both with the same benchmarks (wordcount, grep, sort, etc.) with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and 28GB RAM. I observed a strange behaviour while running the benchmarks and is as follows: - When I ran Spark 1.2 version with default partition number (sc.defaultParallelism), the jobs would take forever to complete. So I changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic and the jobs completed quickly. - However, when I tried the above magic number on the version 2.1, the jobs are taking forever. Deafult parallelism works better, but not that efficient. I'm having problem to rationalise this and compare both the systems. My question is: what changes were made from 1.2 to 2.1 with respect to default parallelism for this behaviour to occur? How can I have both versions behave similary on the same software/hardware configuration so that I can compare? I'd really appreciate your help on this! Cheers, Jeevan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Which Py4J version goes with Spark 2.3.1?
Hi, I think that the best option is to use the py4j which is either automatically installed with "pip install pyspark" or when we unzip the Spark download from its site, its in SPARK_HOME/python/lib folder. Regards, Gourav Sengupta On Wed, Aug 29, 2018 at 8:00 AM Aakash Basu wrote: > Hi, > > Which Py4J version goes with Spark 2.3.1? I have py4j-0.10.7 but throws > an error because of certain compatibility issues with the Spark 2.3.1. > > Error: > > [2018-08-29] [06:46:56] [ERROR] - Traceback (most recent call last): File > "", line 120, in run File > "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py", > line 441, in csv return > self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) > File > "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ answer, self.gateway_client, self.target_id, > self.name) File > "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 63, in deco return f(*a, **kw) File > "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", > line 328, in get_return_value format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o49.csv. > > Any help? > > Thanks, > Aakash. >
Spark udf from external jar without enabling Hive
Hi Team, I am creating udf as follow from external jar val spark = SparkSession.builder.appName("UdfUser") .master("local") .enableHiveSupport() .getOrCreate() spark.sql("CREATE FUNCTION uppercase AS 'path.package.udf.UpperCase' " + "USING JAR '/home/swapnil/udf-jars/spark-udf-1.0.jar'") Can I do same without enabling hive support (i.e. memory catalog)? I tried same but didn't get solution. Any help is really appreciated. Thanks, Swapnil
java.lang.OutOfMemoryError: Java heap space - Spark driver.
I got this error from spark driver, it seems that I should increase the memory in the driver although it's 5g (and 4 cores) right now. It seems weird to me because I'm not using Kryo or broadcast in this process but in the log there are references to Kryo and broadcast. How could I figure out the reason of this outOfMemory? Is it normal that there are references to Kryo and broadcasting when I'm not using it? 05:11:19.110 [streaming-job-executor-0] WARN c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange] because it collides with previously registered codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange] 05:11:26.806 [dag-scheduler-event-loop] WARN org.apache.spark.util.Utils - Suppressing exception in finally: Java heap space java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) ~[na:1.8.0_162] at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_162] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) ~[lz4-1.3.0.jar:na] at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) ~[lz4-1.3.0.jar:na] at com.esotericsoftware.kryo.io.Output.flush(Output.java:181) ~[kryo-3.0.3.jar:na] at com.esotericsoftware.kryo.io.Output.close(Output.java:191) ~[kryo-3.0.3.jar:na] at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:209) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1012) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:936) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:935) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at scala.collection.immutable.List.foreach(List.scala:392) [scala-library-2.11.11.jar:na] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:935) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] 05:40:53.535 [dse-app-client-thread-pool-0] WARN c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType'
Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this exception and Spark dies. I couldn't see any error or problem among the machines, anybody has the reason about this error? java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091) ~[kafka-clients-1.0.0.jar:na] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.11.11.jar:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) ~[scala-library-2.11.11.jar:na] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
Which Py4J version goes with Spark 2.3.1?
Hi, Which Py4J version goes with Spark 2.3.1? I have py4j-0.10.7 but throws an error because of certain compatibility issues with the Spark 2.3.1. Error: [2018-08-29] [06:46:56] [ERROR] - Traceback (most recent call last): File "", line 120, in run File "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 441, in csv return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) File "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o49.csv. Any help? Thanks, Aakash.