[ANNOUNCE] Apache Spark 2.1.1
We are happy to announce the availability of Spark 2.1.1! Apache Spark 2.1.1 is a maintenance release, based on the branch-2.1 maintenance branch of Spark. We strongly recommend all 2.1.x users to upgrade to this stable release. To download Apache Spark 2.1.1 visit http://spark.apache.org/downloads.html We would like to acknowledge all community members for contributing patches to this release.
Re: Joins in Spark
Sorry, I had a typo I mean repartitionby("fieldofjoin) El 2 may. 2017 9:44 p. m., "KhajaAsmath Mohammed"escribió: Hi Angel, I am trying using the below code but i dont see partition on the dataframe. val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry) import sqlContext._ import sqlContext.implicits._ datapoint_prq_df.join(geoCacheLoc_df) Val tableA = DfA.partitionby("joinField").filter("firstSegment") Columns I have are Lat3,Lon3, VIN, Time . Lat3 and Lon3 are my join columns on both dataframes and rest are select columns Thanks, Asmath On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Have you tried to make partition by join's field and run it by segments, > filtering both tables at the same segments of data? > > Example: > > Val tableA = DfA.partitionby("joinField").filter("firstSegment") > Val tableB= DfB.partitionby("joinField").filter("firstSegment") > > TableA.join(TableB) > > El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" > escribió: > >> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is >> for one month i.e. for April >> >> Table 2: 92 GB not partitioned . >> >> I have to perform join on these tables now. >> >> >> >> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta < >> angel.francisco.o...@gmail.com> wrote: >> >>> Hello, >>> >>> Is the tables partitioned? >>> If yes, what is the partition field? >>> >>> Thanks >>> >>> >>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" < >>> mdkhajaasm...@gmail.com> escribió: >>> >>> Hi, >>> >>> I am trying to join two big tables in spark and the job is running for >>> quite a long time without any results. >>> >>> Table 1: 192GB >>> Table 2: 92 GB >>> >>> Does anyone have better solution to get the results fast? >>> >>> Thanks, >>> Asmath >>> >>> >>> >>
Re: Joins in Spark
Hi Angel, I am trying using the below code but i dont see partition on the dataframe. val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry) import sqlContext._ import sqlContext.implicits._ datapoint_prq_df.join(geoCacheLoc_df) Val tableA = DfA.partitionby("joinField").filter("firstSegment") Columns I have are Lat3,Lon3, VIN, Time . Lat3 and Lon3 are my join columns on both dataframes and rest are select columns Thanks, Asmath On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Have you tried to make partition by join's field and run it by segments, > filtering both tables at the same segments of data? > > Example: > > Val tableA = DfA.partitionby("joinField").filter("firstSegment") > Val tableB= DfB.partitionby("joinField").filter("firstSegment") > > TableA.join(TableB) > > El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed"> escribió: > >> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is >> for one month i.e. for April >> >> Table 2: 92 GB not partitioned . >> >> I have to perform join on these tables now. >> >> >> >> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta < >> angel.francisco.o...@gmail.com> wrote: >> >>> Hello, >>> >>> Is the tables partitioned? >>> If yes, what is the partition field? >>> >>> Thanks >>> >>> >>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" < >>> mdkhajaasm...@gmail.com> escribió: >>> >>> Hi, >>> >>> I am trying to join two big tables in spark and the job is running for >>> quite a long time without any results. >>> >>> Table 1: 192GB >>> Table 2: 92 GB >>> >>> Does anyone have better solution to get the results fast? >>> >>> Thanks, >>> Asmath >>> >>> >>> >>
Re: [Spark Streaming] Dynamic Broadcast Variable Update
One, I think, you should take this to the spark developer list. Two, I suspect broadcast variables aren't the best solution for the use case, you describe. Maybe an in-memory data/object/file store like tachyon is a better fit. Thanks, Tim On Tue, May 2, 2017 at 11:56 AM, Nipun Arorawrote: > Hi All, > > To support our Spark Streaming based anomaly detection tool, we have made > a patch in Spark 1.6.2 to dynamically update broadcast variables. > > I'll first explain our use-case, which I believe should be common to > several people using Spark Streaming applications. Broadcast variables are > often used to store values "machine learning models", which can then be > used on streaming data to "test" and get the desired results (for our case > anomalies). Unfortunately, in the current spark, broadcast variables are > final and can only be initialized once before the initialization of the > streaming context. Hence, if a new model is learned the streaming system > cannot be updated without shutting down the application, broadcasting > again, and restarting the application. Our goal was to re-broadcast > variables without requiring a downtime of the streaming service. > > The key to this implementation is a live re-broadcastVariable() interface, > which can be triggered in between micro-batch executions, without any > re-boot required for the streaming application. At a high level the task is > done by re-fetching broadcast variable information from the spark driver, > and then re-distribute it to the workers. The micro-batch execution is > blocked while the update is made, by taking a lock on the execution. We > have already tested this in our prototype deployment of our anomaly > detection service and can successfully re-broadcast the broadcast variables > with no downtime. > > We would like to integrate these changes in spark, can anyone please let > me know the process of submitting patches/ new features to spark. Also. I > understand that the current version of Spark is 2.1. However, our changes > have been done and tested on Spark 1.6.2, will this be a problem? > > Thanks > Nipun > -- -- Thanks, Tim
[Spark Streaming] Dynamic Broadcast Variable Update
Hi All, To support our Spark Streaming based anomaly detection tool, we have made a patch in Spark 1.6.2 to dynamically update broadcast variables. I'll first explain our use-case, which I believe should be common to several people using Spark Streaming applications. Broadcast variables are often used to store values "machine learning models", which can then be used on streaming data to "test" and get the desired results (for our case anomalies). Unfortunately, in the current spark, broadcast variables are final and can only be initialized once before the initialization of the streaming context. Hence, if a new model is learned the streaming system cannot be updated without shutting down the application, broadcasting again, and restarting the application. Our goal was to re-broadcast variables without requiring a downtime of the streaming service. The key to this implementation is a live re-broadcastVariable() interface, which can be triggered in between micro-batch executions, without any re-boot required for the streaming application. At a high level the task is done by re-fetching broadcast variable information from the spark driver, and then re-distribute it to the workers. The micro-batch execution is blocked while the update is made, by taking a lock on the execution. We have already tested this in our prototype deployment of our anomaly detection service and can successfully re-broadcast the broadcast variables with no downtime. We would like to integrate these changes in spark, can anyone please let me know the process of submitting patches/ new features to spark. Also. I understand that the current version of Spark is 2.1. However, our changes have been done and tested on Spark 1.6.2, will this be a problem? Thanks Nipun
Re: do we need to enable writeAheadLogs for DirectStream as well or is it only for indirect stream?
You don't need write ahead logs for direct stream. On Tue, May 2, 2017 at 11:32 AM, kant kodaliwrote: > Hi All, > > I need some fault tolerance for my stateful computations and I am wondering > why we need to enable writeAheadLogs for DirectStream like Kafka (for > Indirect stream it makes sense). In case of driver failure DirectStream such > as Kafka can pull the messages again from the last committed offset right? > > Thanks! - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Initialize Gaussian Mixture Model using Spark ML dataframe API
Yes, I noticed these open issues, both with KMeans and GMM: https://issues.apache.org/jira/browse/SPARK-13025 Thanks, Tim On Mon, May 1, 2017 at 9:01 PM, Yanbo Liangwrote: > Hi Tim, > > Spark ML API doesn't support set initial model for GMM currently. I wish > we can get this feature in Spark 2.3. > > Thanks > Yanbo > > On Fri, Apr 28, 2017 at 1:46 AM, Tim Smith wrote: > >> Hi, >> >> I am trying to figure out the API to initialize a gaussian mixture model >> using either centroids created by K-means or previously calculated GMM >> model (I am aware that you can "save" a model and "load" in later but I am >> not interested in saving a model to a filesystem). >> >> The Spark MLlib API lets you do this using SetInitialModel >> https://spark.apache.org/docs/2.1.0/api/scala/index.html#org >> .apache.spark.mllib.clustering.GaussianMixture >> >> However, I cannot figure out how to do this using Spark ML API. Can >> anyone please point me in the right direction? I've tried reading the Spark >> ML code and was wondering if the "set" call lets you do that? >> >> -- >> Thanks, >> >> Tim >> > > -- -- Thanks, Tim
Re: Driver spins hours in query plan optimization
Seems like https://issues.apache.org/jira/browse/SPARK-13346 is likely the same issue. Seems like for some people persist() doesn't work and they have to convert to RDDs and back. On Fri, Apr 14, 2017 at 1:39 PM, Everett Andersonwrote: > Hi, > > We keep hitting a situation on Spark 2.0.2 (haven't tested later versions, > yet) where the driver spins forever seemingly in query plan optimization > for moderate queries, such as the union of a few (~5) other DataFrames. > > We can see the driver spinning with one core in the nioEventLoopGroup-2-2 > thread in a deep trace like the attached. > > Throwing in a MEMORY_OR_DISK persist() so the query plan is collapsed > works around this, but it's a little surprising how often we encounter the > problem, forcing us to work to manage persisting/unpersisting tables and > potentially suffering unnecessary disk I/O. > > I've looking through JIRA but don't see open issues about this -- might've > just not found them successfully. > > Anyone else encounter this? > >
Re: Schema Evolution for nested Dataset[T]
Hi Michael, Thank you for the suggestions. I am wondering how I can make `withColumn` to handle nested structure? For example, below is my code to generate the data. I basically add the `age` field to `Person2`, which is nested in an Array for Course2. Then I want to fill in 0 for age with age is null. case class Person1(name: String) case class Person2(name: String, age: Int) case class Course1(id: Int, students: Array[Person1]) case class Course2(id: Int, students: Array[Person2]) Seq(Course1(10, Array(Person1("a"), Person1("b".toDF.write.parquet("data1") Seq(Course2(20, Array(Person2("c",20), Person2("d",10.toDF.write.parquet("data2") val allData = spark.read.option("mergeSchema", "true").parquet("data1", "data2") allData.show +---++ | id|students| +---++ | 20|[[c,20], [d,10]]| | 10|[[a,null], [b,null]]| +---++ *My first try:* allData.withColumn("students.age", coalesce($"students.age", lit(0))) It returns the exception: org.apache.spark.sql.AnalysisException: cannot resolve 'coalesce(`students`.`age`, 0)' due to data type mismatch: input to function coalesce should all be the same type, but it's [array, int];; *My second try: * allData.withColumn("students.age", coalesce($"students.age", array(lit(0), lit(0.show +---+++ | id|students|students.age| +---+++ | 20|[[c,20], [d,10]]|[20, 10]| | 10|[[a,null], [b,null]]|[null, null]| +---+++ It creates a new column "students.age" instead of imputing the value age nested in students. Thank you very much in advance. Mike On Mon, May 1, 2017 at 10:31 AM, Michael Armbrustwrote: > Oh, and if you want a default other than null: > > import org.apache.spark.sql.functions._ > df.withColumn("address", coalesce($"address", lit()) > > On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust > wrote: > >> The following should work: >> >> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema >> spark.read.schema(schema).parquet("data.parquet").as[Course] >> >> Note this will only work for nullable files (i.e. if you add a primitive >> like Int you need to make it an Option[Int]) >> >> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler < >> rotationsymmetr...@gmail.com> wrote: >> >>> Hi Spark Users, >>> >>> Suppose I have some data (stored in parquet for example) generated as >>> below: >>> >>> package com.company.entity.old >>> case class Course(id: Int, students: List[Student]) >>> case class Student(name: String) >>> >>> Then usually I can access the data by >>> >>> spark.read.parquet("data.parquet").as[Course] >>> >>> Now I want to add a new field `address` to Student: >>> >>> package com.company.entity.new >>> case class Course(id: Int, students: List[Student]) >>> case class Student(name: String, address: String) >>> >>> Then obviously running `spark.read.parquet("data.parquet").as[Course]` >>> on data generated by the old entity/schema will fail because `address` >>> is missing. >>> >>> In this case, what is the best practice to read data generated with >>> the old entity/schema to the new entity/schema, with the missing field >>> set to some default value? I know I can manually write a function to >>> do the transformation from the old to the new. But it is kind of >>> tedious. Any automatic methods? >>> >>> Thanks, >>> >>> Mike >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >
Re: Joins in Spark
Have you tried to make partition by join's field and run it by segments, filtering both tables at the same segments of data? Example: Val tableA = DfA.partitionby("joinField").filter("firstSegment") Val tableB= DfB.partitionby("joinField").filter("firstSegment") TableA.join(TableB) El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed"escribió: > Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is > for one month i.e. for April > > Table 2: 92 GB not partitioned . > > I have to perform join on these tables now. > > > > On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta < > angel.francisco.o...@gmail.com> wrote: > >> Hello, >> >> Is the tables partitioned? >> If yes, what is the partition field? >> >> Thanks >> >> >> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" < >> mdkhajaasm...@gmail.com> escribió: >> >> Hi, >> >> I am trying to join two big tables in spark and the job is running for >> quite a long time without any results. >> >> Table 1: 192GB >> Table 2: 92 GB >> >> Does anyone have better solution to get the results fast? >> >> Thanks, >> Asmath >> >> >> >
Re: Joins in Spark
Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is for one month i.e. for April Table 2: 92 GB not partitioned . I have to perform join on these tables now. On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Hello, > > Is the tables partitioned? > If yes, what is the partition field? > > Thanks > > > El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed"> escribió: > > Hi, > > I am trying to join two big tables in spark and the job is running for > quite a long time without any results. > > Table 1: 192GB > Table 2: 92 GB > > Does anyone have better solution to get the results fast? > > Thanks, > Asmath > > >
Re: Joins in Spark
Hello, Is the tables partitioned? If yes, what is the partition field? Thanks El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed"escribió: Hi, I am trying to join two big tables in spark and the job is running for quite a long time without any results. Table 1: 192GB Table 2: 92 GB Does anyone have better solution to get the results fast? Thanks, Asmath
Joins in Spark
Hi, I am trying to join two big tables in spark and the job is running for quite a long time without any results. Table 1: 192GB Table 2: 92 GB Does anyone have better solution to get the results fast? Thanks, Asmath
do we need to enable writeAheadLogs for DirectStream as well or is it only for indirect stream?
Hi All, I need some fault tolerance for my stateful computations and I am wondering why we need to enable writeAheadLogs for DirectStream like Kafka (for Indirect stream it makes sense). In case of driver failure DirectStream such as Kafka can pull the messages again from the last committed offset right? Thanks!
Re: --jars does not take remote jar?
I see.Thanks! On Tue, May 2, 2017 at 9:12 AM, Marcelo Vanzinwrote: > On Tue, May 2, 2017 at 9:07 AM, Nan Zhu wrote: > > I have no easy way to pass jar path to those forked Spark > > applications? (except that I download jar from a remote path to a local > temp > > dir after resolving some permission issues, etc.?) > > Yes, that's the only way currently in client mode. > > -- > Marcelo >
Re: --jars does not take remote jar?
On Tue, May 2, 2017 at 9:07 AM, Nan Zhuwrote: > I have no easy way to pass jar path to those forked Spark > applications? (except that I download jar from a remote path to a local temp > dir after resolving some permission issues, etc.?) Yes, that's the only way currently in client mode. -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: --jars does not take remote jar?
Thanks for the reply! If I have an application master which starts some Spark applications by forking processes (in yarn-client mode) Essentially I have no easy way to pass jar path to those forked Spark applications? (except that I download jar from a remote path to a local temp dir after resolving some permission issues, etc.?) On Tue, May 2, 2017 at 9:00 AM, Marcelo Vanzinwrote: > Remote jars are added to executors' classpaths, but not the driver's. > In YARN cluster mode, they would also be added to the driver's class > path. > > On Tue, May 2, 2017 at 8:43 AM, Nan Zhu wrote: > > Hi, all > > > > For some reason, I tried to pass in a HDFS path to the --jars option in > > spark-submit > > > > According to the document, > > http://spark.apache.org/docs/latest/submitting- > applications.html#advanced-dependency-management, > > --jars would accept remote path > > > > However, in the implementation, > > https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2a > a3c508615b/core/src/main/scala/org/apache/spark/deploy/ > SparkSubmit.scala#L757, > > it does not look like so > > > > Did I miss anything? > > > > Best, > > > > Nan > > > > -- > Marcelo >
Re: --jars does not take remote jar?
Remote jars are added to executors' classpaths, but not the driver's. In YARN cluster mode, they would also be added to the driver's class path. On Tue, May 2, 2017 at 8:43 AM, Nan Zhuwrote: > Hi, all > > For some reason, I tried to pass in a HDFS path to the --jars option in > spark-submit > > According to the document, > http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management, > --jars would accept remote path > > However, in the implementation, > https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2aa3c508615b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L757, > it does not look like so > > Did I miss anything? > > Best, > > Nan -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
--jars does not take remote jar?
Hi, all For some reason, I tried to pass in a HDFS path to the --jars option in spark-submit According to the document, http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management, --jars would accept remote path However, in the implementation, https://github.com/apache/spark/blob/c622a87c44e0621e1b3024fdca9b2aa3c508615b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L757, it does not look like so Did I miss anything? Best, Nan
OutOfMemoryError
Hi Spark Users, I have a dataset with ~5M rows x 20 columns, containing a groupID and a rowID. My goal is to check whether (some) columns contain more than a fixed fraction (say, 50%) of missing (null) values within a group. If this is found, the entire column is set to missing (null), for that group. The Problem: The loop runs like a charm during the first iterations, but towards the end, around the 6th or 7th iteration I see my CPU utilization dropping (using 1 instead of 6 cores). Along with that, execution time for one iteration increases significantly. At some point, I get an OutOfMemory Error: * spark.driver.memory < 4G: at collect() (FAIL 1) * 4G <= spark.driver.memory < 10G: at the count() step (FAIL 2) Enabling a HeapDump on OOM (and analyzing it with Eclipse MAT) showed two classes taking up lots of memory: * java.lang.Thread - char (2G) - scala.collection.IndexedSeqLike - scala.collection.mutable.WrappedArray (1G) - java.lang.String (1G) * org.apache.spark.sql.execution.ui.SQLListener - org.apache.spark.sql.execution.ui.SQLExecutionUIData (various of up to 1G in size) - java.lang.String - ... Turning off the SparkUI and/or setting spark.ui.retainedXXX to something low (e.g. 1) did not solve the issue. Any idea what I am doing wrong? Or is this a bug? My Code can be found as a Github Gist [0]. More details can be found on the StackOverflow Question [1] I posted, but did not receive any answers until now. Thanks! [0] https://gist.github.com/TwUxTLi51Nus/4accdb291494be9201abfad72541ce74 [1] http://stackoverflow.com/questions/43637913/apache-spark-outofmemoryerror-heapspace PS: As a workaround, I have been writing and reading temporary parquet files on each loop iteration. -- Tw UxTLi51Nus Email: twuxtli51...@posteo.de - To unsubscribe e-mail: user-unsubscr...@spark.apache.org