Re: SparkR error when repartition is called

2016-08-08 Thread Sun Rui
I can’t reproduce your issue with len=1 in local mode.
Could you give more environment information?
> On Aug 9, 2016, at 11:35, Shane Lee  wrote:
> 
> Hi All,
> 
> I am trying out SparkR 2.0 and have run into an issue with repartition. 
> 
> Here is the R code (essentially a port of the pi-calculating scala example in 
> the spark package) that can reproduce the behavior:
> 
> schema <- structType(structField("input", "integer"), 
> structField("output", "integer"))
> 
> library(magrittr)
> 
> len = 3000
> data.frame(n = 1:len) %>%
> as.DataFrame %>%
> SparkR:::repartition(10L) %>%
>   dapply(., function (df)
>   {
>   library(plyr)
>   ddply(df, .(n), function (y)
>   {
>   data.frame(z = 
>   {
>   x1 = runif(1) * 2 - 1
>   y1 = runif(1) * 2 - 1
>   z = x1 * x1 + y1 * y1
>   if (z < 1)
>   {
>   1L
>   }
>   else
>   {
>   0L
>   }
>   })
>   })
>   }
>   , schema
>   ) %>% 
>   SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
> 
> For me it runs fine as long as len is less than 5000, otherwise it errors out 
> with the following message:
> 
> Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
>   org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 
> in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 
> 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation 
> failed with
>  Error in readBin(con, raw(), stringLen, endian = "big") : 
>   invalid 'n' argument
> Calls:  -> readBin
> Execution halted
>   at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
>   at 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
>   at 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
>   at 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
>   at 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
> 
> If the repartition call is removed, it runs fine again, even with very large 
> len.
> 
> After looking through the documentations and searching the web, I can't seem 
> to find any clues how to fix this. Anybody has seen similary problem?
> 
> Thanks in advance for your help.
> 
> Shane
> 



How to get the parameters of bestmodel while using paramgrid and crossvalidator?

2016-08-08 Thread colin
I'm using CrossValidator and paramgrid to find the best parameters of my
model.
After crossvalidate, I got a CrossValidatorModel but when I want to get the
parameters of my pipeline, there's nothing in the parameter list of
bestmodel.

Here's the code runing on jupyter notebook:
sq=SQLContext(sc)
df=sq.createDataFrame([(1,0,1),(2,0,2),(1,0,1),(2,1,2),(1,1,1),(2,1,2),(3,1,1),(2,1,2),(1,1,2)],["a","b","c"])
lableIndexer=StringIndexer(inputCol="c",outputCol="label").fit(df)
vecAssembler=VectorAssembler(inputCols=["a","b"],outputCol="featureVector")
lr=LogisticRegression(featuresCol="featureVector",labelCol="label")
pip=Pipeline()
pip.setStages([lableIndexer,vecAssembler,lr])
grid=ParamGridBuilder().addGrid(lr.maxIter,[10,20,30,40]).build()
evaluator=MulticlassClassificationEvaluator()
cv=CrossValidator(estimator=pip,estimatorParamMaps=grid,evaluator=evaluator,numFolds=3)
cvModel=cv.fit(df)
cvPrediction=cvModel.transform(df)
cvPrediction
0.66
cvModel.bestModel.params
[]

There's no error, so what's wrong?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-parameters-of-bestmodel-while-using-paramgrid-and-crossvalidator-tp27497.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Robin East
Another approach is to use L1 regularisation eg 
http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression.
 This adds a penalty term to the regression equation to reduce model 
complexity. When you use L1 (as opposed to say L2) this tends to promote 
sparsity in the coefficients i.e.some of the coefficients are pushed to zero, 
effectively deselecting them from the model.

Sent from my iPhone

> On 9 Aug 2016, at 04:19, Peyman Mohajerian  wrote:
> 
> You can try 'feature Importances' or 'feature selection' depending on what 
> else you want to do with the remaining features that's a possibility. Let's 
> say you are trying to do classification then some of the Spark Libraries have 
> a model parameter called 'featureImportances' that tell you which feature(s) 
> are more dominant in you classification, you can then run your model again 
> with the smaller set of features. 
> The two approaches are quite different, what I'm suggesting involves training 
> (supervised learning) in the context of a target function, with SVD you are 
> doing unsupervised learning.
> 
>> On Mon, Aug 8, 2016 at 7:23 PM, Rohit Chaddha  
>> wrote:
>> I would rather have less features to make better inferences on the data 
>> based on the smaller number of factors, 
>> Any suggestions Sean ? 
>> 
>>> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:
>>> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
>>> really want to select features or just obtain a lower-dimensional
>>> representation of them, with less redundancy?
>>> 
>>> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane  wrote:
>>> > There must be an algorithmic way to figure out which of these factors
>>> > contribute the least and remove them in the analysis.
>>> > I am hoping same one can throw some insight on this.
>>> >
>>> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S  wrote:
>>> >>
>>> >> Not an expert here, but the first step would be devote some time and
>>> >> identify which of these 112 factors are actually causative. Some domain
>>> >> knowledge of the data may be required. Then, you can start of with PCA.
>>> >>
>>> >> HTH,
>>> >>
>>> >> Regards,
>>> >>
>>> >> Sivakumaran S
>>> >>
>>> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>>> >>
>>> >> Great question Rohit.  I am in my early days of ML as well and it would 
>>> >> be
>>> >> great if we get some idea on this from other experts on this group.
>>> >>
>>> >> I know we can reduce dimensions by using PCA, but i think that does not
>>> >> allow us to understand which factors from the original are we using in 
>>> >> the
>>> >> end.
>>> >>
>>> >> - Tony L.
>>> >>
>>> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha 
>>> >> 
>>> >> wrote:
>>> >>>
>>> >>>
>>> >>> I have a data-set where each data-point has 112 factors.
>>> >>>
>>> >>> I want to remove the factors which are not relevant, and say reduce to 
>>> >>> 20
>>> >>> factors out of these 112 and then do clustering of data-points using 
>>> >>> these
>>> >>> 20 factors.
>>> >>>
>>> >>> How do I do these and how do I figure out which of the 20 factors are
>>> >>> useful for analysis.
>>> >>>
>>> >>> I see SVD and PCA implementations, but I am not sure if these give which
>>> >>> elements are removed and which are remaining.
>>> >>>
>>> >>> Can someone please help me understand what to do here
>>> >>>
>>> >>> thanks,
>>> >>> -Rohit
>>> >>>
>>> >>
>>> >>
>>> >
> 


Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Rohit Chaddha
@Peyman - does any of the clustering algorithms have "feature Importance"
or "feature selection" ability ?  I can't seem to pinpoint



On Tue, Aug 9, 2016 at 8:49 AM, Peyman Mohajerian 
wrote:

> You can try 'feature Importances' or 'feature selection' depending on what
> else you want to do with the remaining features that's a possibility. Let's
> say you are trying to do classification then some of the Spark Libraries
> have a model parameter called 'featureImportances' that tell you which
> feature(s) are more dominant in you classification, you can then run your
> model again with the smaller set of features.
> The two approaches are quite different, what I'm suggesting involves
> training (supervised learning) in the context of a target function, with
> SVD you are doing unsupervised learning.
>
> On Mon, Aug 8, 2016 at 7:23 PM, Rohit Chaddha 
> wrote:
>
>> I would rather have less features to make better inferences on the data
>> based on the smaller number of factors,
>> Any suggestions Sean ?
>>
>> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:
>>
>>> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
>>> really want to select features or just obtain a lower-dimensional
>>> representation of them, with less redundancy?
>>>
>>> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane 
>>> wrote:
>>> > There must be an algorithmic way to figure out which of these factors
>>> > contribute the least and remove them in the analysis.
>>> > I am hoping same one can throw some insight on this.
>>> >
>>> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
>>> wrote:
>>> >>
>>> >> Not an expert here, but the first step would be devote some time and
>>> >> identify which of these 112 factors are actually causative. Some
>>> domain
>>> >> knowledge of the data may be required. Then, you can start of with
>>> PCA.
>>> >>
>>> >> HTH,
>>> >>
>>> >> Regards,
>>> >>
>>> >> Sivakumaran S
>>> >>
>>> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>>> >>
>>> >> Great question Rohit.  I am in my early days of ML as well and it
>>> would be
>>> >> great if we get some idea on this from other experts on this group.
>>> >>
>>> >> I know we can reduce dimensions by using PCA, but i think that does
>>> not
>>> >> allow us to understand which factors from the original are we using
>>> in the
>>> >> end.
>>> >>
>>> >> - Tony L.
>>> >>
>>> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha <
>>> rohitchaddha1...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>>
>>> >>> I have a data-set where each data-point has 112 factors.
>>> >>>
>>> >>> I want to remove the factors which are not relevant, and say reduce
>>> to 20
>>> >>> factors out of these 112 and then do clustering of data-points using
>>> these
>>> >>> 20 factors.
>>> >>>
>>> >>> How do I do these and how do I figure out which of the 20 factors are
>>> >>> useful for analysis.
>>> >>>
>>> >>> I see SVD and PCA implementations, but I am not sure if these give
>>> which
>>> >>> elements are removed and which are remaining.
>>> >>>
>>> >>> Can someone please help me understand what to do here
>>> >>>
>>> >>> thanks,
>>> >>> -Rohit
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>
>>
>


SparkR error when repartition is called

2016-08-08 Thread Shane Lee
Hi All,
I am trying out SparkR 2.0 and have run into an issue with repartition. 
Here is the R code (essentially a port of the pi-calculating scala example in 
the spark package) that can reproduce the behavior:
schema <- structType(structField("input", "integer"), structField("output", 
"integer"))
library(magrittr)

len = 3000data.frame(n = 1:len) %>%as.DataFrame %>%
SparkR:::repartition(10L) %>% dapply(., function (df) { library(plyr) ddply(df, 
.(n), function (y)
 { data.frame(z =  { x1 = runif(1) * 2 - 1 y1 = runif(1) * 2 - 1 z = x1 * x1 + 
y1 * y1 if (z < 1) { 1L } else { 0L } }) }) } , schema ) %>%  
SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
For me it runs fine as long as len is less than 5000, otherwise it errors out 
with the following message:
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :   
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in 
stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 56.0 
(TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation failed 
with Error in readBin(con, raw(), stringLen, endian = "big") :   invalid 'n' 
argumentCalls:  -> readBinExecution halted at 
org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) at 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
 at 
org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
 at 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
 at 
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
 at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
If the repartition call is removed, it runs fine again, even with very large 
len.
After looking through the documentations and searching the web, I can't seem to 
find any clues how to fix this. Anybody has seen similary problem?
Thanks in advance for your help.
Shane


Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Peyman Mohajerian
You can try 'feature Importances' or 'feature selection' depending on what
else you want to do with the remaining features that's a possibility. Let's
say you are trying to do classification then some of the Spark Libraries
have a model parameter called 'featureImportances' that tell you which
feature(s) are more dominant in you classification, you can then run your
model again with the smaller set of features.
The two approaches are quite different, what I'm suggesting involves
training (supervised learning) in the context of a target function, with
SVD you are doing unsupervised learning.

On Mon, Aug 8, 2016 at 7:23 PM, Rohit Chaddha 
wrote:

> I would rather have less features to make better inferences on the data
> based on the smaller number of factors,
> Any suggestions Sean ?
>
> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:
>
>> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
>> really want to select features or just obtain a lower-dimensional
>> representation of them, with less redundancy?
>>
>> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane  wrote:
>> > There must be an algorithmic way to figure out which of these factors
>> > contribute the least and remove them in the analysis.
>> > I am hoping same one can throw some insight on this.
>> >
>> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
>> wrote:
>> >>
>> >> Not an expert here, but the first step would be devote some time and
>> >> identify which of these 112 factors are actually causative. Some domain
>> >> knowledge of the data may be required. Then, you can start of with PCA.
>> >>
>> >> HTH,
>> >>
>> >> Regards,
>> >>
>> >> Sivakumaran S
>> >>
>> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>> >>
>> >> Great question Rohit.  I am in my early days of ML as well and it
>> would be
>> >> great if we get some idea on this from other experts on this group.
>> >>
>> >> I know we can reduce dimensions by using PCA, but i think that does not
>> >> allow us to understand which factors from the original are we using in
>> the
>> >> end.
>> >>
>> >> - Tony L.
>> >>
>> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha <
>> rohitchaddha1...@gmail.com>
>> >> wrote:
>> >>>
>> >>>
>> >>> I have a data-set where each data-point has 112 factors.
>> >>>
>> >>> I want to remove the factors which are not relevant, and say reduce
>> to 20
>> >>> factors out of these 112 and then do clustering of data-points using
>> these
>> >>> 20 factors.
>> >>>
>> >>> How do I do these and how do I figure out which of the 20 factors are
>> >>> useful for analysis.
>> >>>
>> >>> I see SVD and PCA implementations, but I am not sure if these give
>> which
>> >>> elements are removed and which are remaining.
>> >>>
>> >>> Can someone please help me understand what to do here
>> >>>
>> >>> thanks,
>> >>> -Rohit
>> >>>
>> >>
>> >>
>> >
>>
>
>


Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Rohit Chaddha
I would rather have less features to make better inferences on the data
based on the smaller number of factors,
Any suggestions Sean ?

On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:

> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
> really want to select features or just obtain a lower-dimensional
> representation of them, with less redundancy?
>
> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane  wrote:
> > There must be an algorithmic way to figure out which of these factors
> > contribute the least and remove them in the analysis.
> > I am hoping same one can throw some insight on this.
> >
> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
> wrote:
> >>
> >> Not an expert here, but the first step would be devote some time and
> >> identify which of these 112 factors are actually causative. Some domain
> >> knowledge of the data may be required. Then, you can start of with PCA.
> >>
> >> HTH,
> >>
> >> Regards,
> >>
> >> Sivakumaran S
> >>
> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
> >>
> >> Great question Rohit.  I am in my early days of ML as well and it would
> be
> >> great if we get some idea on this from other experts on this group.
> >>
> >> I know we can reduce dimensions by using PCA, but i think that does not
> >> allow us to understand which factors from the original are we using in
> the
> >> end.
> >>
> >> - Tony L.
> >>
> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha <
> rohitchaddha1...@gmail.com>
> >> wrote:
> >>>
> >>>
> >>> I have a data-set where each data-point has 112 factors.
> >>>
> >>> I want to remove the factors which are not relevant, and say reduce to
> 20
> >>> factors out of these 112 and then do clustering of data-points using
> these
> >>> 20 factors.
> >>>
> >>> How do I do these and how do I figure out which of the 20 factors are
> >>> useful for analysis.
> >>>
> >>> I see SVD and PCA implementations, but I am not sure if these give
> which
> >>> elements are removed and which are remaining.
> >>>
> >>> Can someone please help me understand what to do here
> >>>
> >>> thanks,
> >>> -Rohit
> >>>
> >>
> >>
> >
>


答复: 答复: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread 莫涛
Hi guha,

Thanks a lot!
This is perfectly what I want and I'll try to implement it.


MoTao

发件人: ayan guha 
发送时间: 2016年8月8日 18:05:37
收件人: 莫涛
抄送: ndj...@gmail.com; user@spark.apache.org
主题: Re: 答复: how to generate a column using mapParition and then add it back to 
the df?

Hi

I think you should modify initModel() function to getOrCreateModel() and create 
the model as singleton object. You may want to refer this 
link

On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 
mailto:mo...@sensetime.com>> wrote:
Hi Ndjido,

Thanks for your reply.

Yes, it is good idea if the model can be broadcast.

I'm working with a built library (on Linux, say classifier.so and classifier.h) 
and it requires the model file is in the local file system.
As I don't have access to the library code, I write JNI to wrap the classifier.
The model file can be sent to each executor efficiently by addFile and getFile.
But initModel() is still expensive as it actually loads a local file into C++ 
heap memory, which is not serializable.

That's the reason I can not broadcast the model and I have to avoid load model 
as possible as I can.

Best


发件人: ndj...@gmail.com 
mailto:ndj...@gmail.com>>
发送时间: 2016年8月8日 17:16:27
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: how to generate a column using mapParition and then add it back to the 
df?


Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao 
> mailto:mo...@sensetime.com>> wrote:
>
> Hi all,
>
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
>
> In my case, the column to be appended is created by processing each row,
> like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf {
>  v: Double => {
>val model = initModel()
>model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
>
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
>
> But this is wrong as a column of df2 *CANNOT* be appended to df.
>
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
>
> I wonder how to deal with this problem with as few overhead as possible?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org
>



--
Best Regards,
Ayan Guha


Re: Cumulative Sum function using Dataset API

2016-08-08 Thread Jon Barksdale
I don't think that would work properly, and would probably just give me the
sum for each partition. I'll give it a try when I get home just to be
certain.

To maybe explain the intent better, if I have a column (pre sorted) of
(1,2,3,4), then the cumulative sum would return (1,3,6,10).

Does that make sense? Naturally, if ordering a sum turns it into a
cumulative sum, I'll gladly use that :)

Jon
On Mon, Aug 8, 2016 at 4:55 PM ayan guha  wrote:

> You mean you are not able to use sum(col) over (partition by key order by
> some_col) ?
>
> On Tue, Aug 9, 2016 at 9:53 AM, jon  wrote:
>
>> Hi all,
>>
>> I'm trying to write a function that calculates a cumulative sum as a
>> column
>> using the Dataset API, and I'm a little stuck on the implementation.  From
>> what I can tell, UserDefinedAggregateFunctions don't seem to support
>> windowing clauses, which I think I need for this use case.  If I write a
>> function that extends from AggregateWindowFunction, I end up needing
>> classes
>> that are package private to the sql package, so I need to make my function
>> under the org.apache.spark.sql package, which just feels wrong.
>>
>> I've also considered writing a custom transformer, but haven't spend as
>> much
>> time reading through the code, so I don't know how easy or hard that would
>> be.
>>
>> TLDR; What's the best way to write a function that returns a value for
>> every
>> row, but has mutable state, and gets row in a specific order?
>>
>> Does anyone have any ideas, or examples?
>>
>> Thanks,
>>
>> Jon
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Cumulative-Sum-function-using-Dataset-API-tp27496.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Cumulative Sum function using Dataset API

2016-08-08 Thread ayan guha
You mean you are not able to use sum(col) over (partition by key order by
some_col) ?

On Tue, Aug 9, 2016 at 9:53 AM, jon  wrote:

> Hi all,
>
> I'm trying to write a function that calculates a cumulative sum as a column
> using the Dataset API, and I'm a little stuck on the implementation.  From
> what I can tell, UserDefinedAggregateFunctions don't seem to support
> windowing clauses, which I think I need for this use case.  If I write a
> function that extends from AggregateWindowFunction, I end up needing
> classes
> that are package private to the sql package, so I need to make my function
> under the org.apache.spark.sql package, which just feels wrong.
>
> I've also considered writing a custom transformer, but haven't spend as
> much
> time reading through the code, so I don't know how easy or hard that would
> be.
>
> TLDR; What's the best way to write a function that returns a value for
> every
> row, but has mutable state, and gets row in a specific order?
>
> Does anyone have any ideas, or examples?
>
> Thanks,
>
> Jon
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Cumulative-Sum-function-using-
> Dataset-API-tp27496.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Cumulative Sum function using Dataset API

2016-08-08 Thread jon
Hi all,

I'm trying to write a function that calculates a cumulative sum as a column
using the Dataset API, and I'm a little stuck on the implementation.  From
what I can tell, UserDefinedAggregateFunctions don't seem to support
windowing clauses, which I think I need for this use case.  If I write a
function that extends from AggregateWindowFunction, I end up needing classes
that are package private to the sql package, so I need to make my function
under the org.apache.spark.sql package, which just feels wrong.

I've also considered writing a custom transformer, but haven't spend as much
time reading through the code, so I don't know how easy or hard that would
be.

TLDR; What's the best way to write a function that returns a value for every
row, but has mutable state, and gets row in a specific order?

Does anyone have any ideas, or examples?

Thanks,

Jon




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cumulative-Sum-function-using-Dataset-API-tp27496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Logistic regression formula string

2016-08-08 Thread Cesar
I have a data frame with four columns, label , feature_1, feature_2,
feature_3. Is there a simple way in the ML library to give me the weights
based in feature names? I can only get the weights, which make this simple
task complicated when one of my features is categorical.

I am looking for something similar to what R output does (where it clearly
indicates which weight corresponds to each feature name, including
categorical ones).



Thanks a lot !
-- 
Cesar Flores


Issue with temporary table in Spark 2

2016-08-08 Thread Mich Talebzadeh
Hi,

This used to work in Spark 1.6.1. I am trying in Spark 2

scala>  val a = df.filter(col("Transaction Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString,p(4).toString,p(5).toString,p(6).toString,p(7).toString.toDouble))
a: org.apache.spark.sql.Dataset[Accounts] = [TransactionDate: string,
TransactionType: string ... 6 more fields]
scala> a.printSchema
root
 |-- TransactionDate: string (nullable = true)
 |-- TransactionType: string (nullable = true)
 |-- SortCode: string (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- TransactionDescription: string (nullable = true)
 |-- DebitAmount: string (nullable = true)
 |-- CreditAmount: string (nullable = true)
 |-- Balance: double (nullable = true)

Now I register it as  a temptable

scala> a.registerTempTable("tmp")
scala> sql("select count(1) from tmp")
res35: org.apache.spark.sql.DataFrame = [count(1): bigint]

Now try to collect it. it falls over

scala> sql("select count(1) from tmp").collect
16/08/08 23:12:03 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID
36)
java.lang.NullPointerException
at
$line72.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)
at
$line72.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/08/08 23:12:03 ERROR TaskSetManager: Task 0 in stage 13.0 failed 1
times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage
13.0 (TID 36, localhost): java.lang.NullPointerException
at $anonfun$1.apply(:31)
at $anonfun$1.apply(:31)

Any ideas what is happening!

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
The show thing was the result of an optimization that short-circuited any
real Spark computation when the input is a local collection, and the result
was simply the first few rows. That's why it completed without serializing
anything.

It is somewhat inconsistent. One way to eliminate the inconsistency is to
always serialize the query plan even for local execution. We did that back
in the days for the RDD code path, and we can do similar things for the SQL
code path. However, serialization is not free and it will slow down the
execution by small percentage.



On Tue, Aug 9, 2016 at 5:05 AM, Hao Ren  wrote:

> @Reynold
>
> Some questions to make things clear:
>
> 1. As nothing is really final in the JVM, is the generated code during
> the execution of `df.show()` different from the one of `df.filter($"key"
> === 2).show()` in my snippet ?
>
> 2. When `df.show()` is being executed, it seems that the 'notSer' object
> is not serialized (since no exception), instead the Int value in it is
> serialized. Is this correct ?
> As for me, this behavior is counterintuitive.
> The analogical problem would be a `RDD.map` 's closure contains
> 'notSer.value'. For example:
> 
> rdd.map {
>   case (key, value) => value + notSer.value
> }
> rdd.count
> 
> It should thrown a "Task not serializable" exception. But for dataframe,
> it is not the case because of reflection or unsafe.
>
> 3. I am wondering whether this "feature" of scala complier makes the
> DataFrame API unpredictable ? Any roadmap on this ?
> As a user, I can not expect that a `fitler` call before `show` crashes,
> while a simple `show` on the original df works.
>
> The workaround I can imagine is just to cache and materialize `df` by
> `df.cache.count()`, and then call `df.filter(...).show()`.
> It should work, just a little bit tedious.
>
>
>
> On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin  wrote:
>
>> That is unfortunately the way how Scala compiler captures (and defines)
>> closures. Nothing is really final in the JVM. You can always use reflection
>> or unsafe to modify the value of fields.
>>
>> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <
>> simon.sc...@viavisolutions.com> wrote:
>>
>>> But does the “notSer” object have to be serialized?
>>>
>>>
>>>
>>> The object is immutable by the definition of A, so the only thing that
>>> needs to be serialized is the (immutable) Int value? And Ints are
>>> serializable?
>>>
>>>
>>>
>>> Just thinking out loud
>>>
>>>
>>>
>>> Simon Scott
>>>
>>>
>>>
>>> Research Developer @ viavisolutions.com
>>>
>>>
>>>
>>> *From:* Hao Ren [mailto:inv...@gmail.com]
>>> *Sent:* 08 August 2016 09:03
>>> *To:* Muthu Jayakumar 
>>> *Cc:* user ; dev 
>>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>>> does not work as expected
>>>
>>>
>>>
>>> Yes, it is.
>>>
>>> You can define a udf like that.
>>>
>>> Basically, it's a udf Int => Int which is a closure contains a non
>>> serializable object.
>>>
>>> The latter should cause Task not serializable exception.
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
>>> wrote:
>>>
>>> Hello Hao Ren,
>>>
>>>
>>>
>>> Doesn't the code...
>>>
>>>
>>>
>>> val add = udf {
>>>
>>>   (a: Int) => a + notSer.value
>>>
>>> }
>>>
>>> Mean UDF function that Int => Int ?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Muthu
>>>
>>>
>>>
>>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>>>
>>> I am playing with spark 2.0
>>>
>>> What I tried to test is:
>>>
>>>
>>>
>>> Create a UDF in which there is a non serializable object.
>>>
>>> What I expected is when this UDF is called during materializing the
>>> dataFrame where the UDF is used in "select", an task non serializable
>>> exception should be thrown.
>>>
>>> It depends also which "action" is called on that dataframe.
>>>
>>>
>>>
>>> Here is the code for reproducing the pb:
>>>
>>>
>>>
>>> 
>>>
>>> object DataFrameSerDeTest extends App {
>>>
>>>
>>>
>>>   class A(val value: Int) // It is not serializable
>>>
>>>
>>>
>>>   def run() = {
>>>
>>> val spark = SparkSession
>>>
>>>   .builder()
>>>
>>>   .appName("DataFrameSerDeTest")
>>>
>>>   .master("local[*]")
>>>
>>>   .getOrCreate()
>>>
>>>
>>>
>>> import org.apache.spark.sql.functions.udf
>>>
>>> import spark.sqlContext.implicits._
>>>
>>>
>>>
>>> val notSer = new A(2)
>>>
>>> val add = udf {
>>>
>>>   (a: Int) => a + notSer.value
>>>
>>> }
>>>
>>> val df = spark.createDataFrame(Seq(
>>>
>>>   (1, 2),
>>>
>>>   (2, 2),
>>>
>>>   (3, 2),
>>>
>>>   (4, 2)
>>>
>>> )).toDF("key", "value")
>>>
>>>   .select($"key", add($"value").as("added"))
>>>
>>>
>>>
>>> df.show() // *It should not work because the udf contains a
>>> non-serializable object, but it works*
>>>
>>>
>>>
>>> df.filter($"key" === 2).show() // *It does not work as expected
>>> (org.apache.spark.SparkException: Task not serializable)*
>>>
>>>   }
>>>
>>>
>>>
>>>   run()
>>>
>>> }
>>>
>>> 

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-08 Thread Davies Liu
On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor  wrote:
> Hi all,
>
> I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
> using pyspark.
>
> There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> executors's memory in SparkSQL, on which we would do some calculation using
> UDFs in pyspark.
> If I run my SQL on only a portion of the data (filtering by one of the
> attributes), let's say 800 million records, then all works well. But when I
> run the same SQL on all the data, then I receive
> "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically all
> of the executors.
>
> It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> causing this "GC overhead limit being exceeded".
>
> Details:
>
> - using Spark 2.0.0 on a Hadoop YARN cluster
>
> - 300 executors, each with 2 CPU cores and 8Gb memory (
> spark.yarn.executor.memoryOverhead=6400 )

Does this mean you only have 1.6G memory for executor (others left for Python) ?
The cached table could take 1.5G, it means almost nothing left for other things.

Python UDF do requires some buffering in JVM, the size of buffering depends on
how much rows are under processing by Python process.

> - a table of 5.6 Billions rows loaded into the memory of the executors
> (taking up 450Gb of memory), partitioned evenly across the executors
>
> - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> exceeded' error if running on all records. Running the same on a smaller
> dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> the whole dataset.
>
> - simplified pyspark code:
>
> from pyspark.sql.types import StringType
>
> def test_udf(var):
> """test udf that will always return a"""
> return "a"
> sqlContext.registerFunction("test_udf", test_udf, StringType())
>
> sqlContext.sql("""CACHE TABLE ma""")
>
> results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> STANDARD_ACCOUNT_CITY_SRC)
>  /
> CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> (STANDARD_ACCOUNT_CITY_SRC)
> THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> FROM ma""")
>
> results_df.registerTempTable("m")
> sqlContext.cacheTable("m")
>
> results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> print(results_df.take(1))
>
>
> - the error thrown on the executors:
>
> 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL TERM
>
>
> Has anybody experienced these "GC overhead limit exceeded" errors with
> pyspark UDFs before?
>
> Thanks,
> Zoltan
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-08 Thread Zoltan Fedor
Hi all,

I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
using pyspark.

There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
executors's memory in SparkSQL, on which we would do some calculation using
UDFs in pyspark.
If I run my SQL on only a portion of the data (filtering by one of the
attributes), let's say 800 million records, then all works well. But when I
run the same SQL on all the data, then I receive "*java.lang.OutOfMemoryError:
GC overhead limit exceeded"* from basically all of the executors.

It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
causing this "GC overhead limit being exceeded".

Details:

- using Spark 2.0.0 on a Hadoop YARN cluster

- 300 executors, each with 2 CPU cores and 8Gb memory (
spark.yarn.executor.memoryOverhead=6400 )

- a table of 5.6 Billions rows loaded into the memory of the executors
(taking up 450Gb of memory), partitioned evenly across the executors

- creating even the simplest UDF in SparkSQL causes 'GC overhead limit
exceeded' error if running on all records. Running the same on a smaller
dataset (~800 million rows) does succeed. If no UDF, the query succeed on
the whole dataset.

- simplified pyspark code:

*from pyspark.sql.types import StringType*

*def test_udf(var):*
*"""test udf that will always return a"""*
*return "a"*
*sqlContext.registerFunction("test_udf", test_udf, StringType())*

*sqlContext.sql("""CACHE TABLE ma""")*

*results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,*
*test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,*
* ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
STANDARD_ACCOUNT_CITY_SRC) *
*  / *
* CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
(STANDARD_ACCOUNT_CITY_SRC)*
* THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
* ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
*END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,*
* STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV*
* FROM ma""")*

*results_df.registerTempTable("m")*
*sqlContext.cacheTable("m")*

*results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")*
*print(results_df.take(1))*


- the error thrown on the executors:

*16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
writer for /hadoop/cloudera/parcels/Anaconda/bin/python*
*java.lang.OutOfMemoryError: GC overhead limit exceeded*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)*
* at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)*
* at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)*
* at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)*
* at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)*
* at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:893)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)*
* at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)*
* at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)*
*16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL TERM*


Has anybody experienced these "*GC overhead limit exceeded*" errors with
pyspark UDFs before?

Thanks,
Zoltan


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-08 Thread max square
Here's the complete stacktrace -
https://gist.github.com/rohann/649b0fcc9d5062ef792eddebf5a315c1

For reference, here's the complete function -

  def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir: String) =
{

fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" +
ScalaUtil.currentDateTimeString))

df.write.format("com.databricks.spark.csv").options(Map("mode" ->
"DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
latest)

  }

On Mon, Aug 8, 2016 at 3:41 PM, Ted Yu  wrote:

> Mind showing the complete stack trace ?
>
> Thanks
>
> On Mon, Aug 8, 2016 at 12:30 PM, max square 
> wrote:
>
>> Thanks Ted for the prompt reply.
>>
>> There are three or four DFs that are coming from various sources and I'm
>> doing a unionAll on them.
>>
>> val placesProcessed = placesUnchanged.unionAll(placesAddedWithMerchantId
>> ).unionAll(placesUpdatedFromHotelsWithMerchantId).unionAll(placesUpdat
>> edFromRestaurantsWithMerchantId).unionAll(placesChanged)
>>
>> I'm using Spark 1.6.2.
>>
>> On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:
>>
>>> Can you show the code snippet for unionAll operation ?
>>>
>>> Which Spark release do you use ?
>>>
>>> BTW please use user@spark.apache.org in the future.
>>>
>>> On Mon, Aug 8, 2016 at 11:47 AM, max square 
>>> wrote:
>>>
 Hey guys,

 I'm trying to save Dataframe in CSV format after performing unionAll
 operations on it.
 But I get this exception -

 Exception in thread "main" org.apache.spark.sql.catalyst.
 errors.package$TreeNodeException: execute, tree:
 TungstenExchange hashpartitioning(mId#430,200)

 I'm saving it by

 df.write.format("com.databricks.spark.csv").options(Map("mode" ->
 "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
 + latest)

 It works perfectly if I don't do the unionAll operation.
 I see that the format isn't different by printing the part of the
 results.

 Any help regarding this would be appreciated.


>>>
>>
>


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
@Reynold

Some questions to make things clear:

1. As nothing is really final in the JVM, is the generated code during the
execution of `df.show()` different from the one of `df.filter($"key" ===
2).show()` in my snippet ?

2. When `df.show()` is being executed, it seems that the 'notSer' object is
not serialized (since no exception), instead the Int value in it is
serialized. Is this correct ?
As for me, this behavior is counterintuitive.
The analogical problem would be a `RDD.map` 's closure contains
'notSer.value'. For example:

rdd.map {
  case (key, value) => value + notSer.value
}
rdd.count

It should thrown a "Task not serializable" exception. But for dataframe, it
is not the case because of reflection or unsafe.

3. I am wondering whether this "feature" of scala complier makes the
DataFrame API unpredictable ? Any roadmap on this ?
As a user, I can not expect that a `fitler` call before `show` crashes,
while a simple `show` on the original df works.

The workaround I can imagine is just to cache and materialize `df` by
`df.cache.count()`, and then call `df.filter(...).show()`.
It should work, just a little bit tedious.



On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin  wrote:

> That is unfortunately the way how Scala compiler captures (and defines)
> closures. Nothing is really final in the JVM. You can always use reflection
> or unsafe to modify the value of fields.
>
> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott  com> wrote:
>
>> But does the “notSer” object have to be serialized?
>>
>>
>>
>> The object is immutable by the definition of A, so the only thing that
>> needs to be serialized is the (immutable) Int value? And Ints are
>> serializable?
>>
>>
>>
>> Just thinking out loud
>>
>>
>>
>> Simon Scott
>>
>>
>>
>> Research Developer @ viavisolutions.com
>>
>>
>>
>> *From:* Hao Ren [mailto:inv...@gmail.com]
>> *Sent:* 08 August 2016 09:03
>> *To:* Muthu Jayakumar 
>> *Cc:* user ; dev 
>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>> does not work as expected
>>
>>
>>
>> Yes, it is.
>>
>> You can define a udf like that.
>>
>> Basically, it's a udf Int => Int which is a closure contains a non
>> serializable object.
>>
>> The latter should cause Task not serializable exception.
>>
>>
>>
>> Hao
>>
>>
>>
>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
>> wrote:
>>
>> Hello Hao Ren,
>>
>>
>>
>> Doesn't the code...
>>
>>
>>
>> val add = udf {
>>
>>   (a: Int) => a + notSer.value
>>
>> }
>>
>> Mean UDF function that Int => Int ?
>>
>>
>>
>> Thanks,
>>
>> Muthu
>>
>>
>>
>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>>
>> I am playing with spark 2.0
>>
>> What I tried to test is:
>>
>>
>>
>> Create a UDF in which there is a non serializable object.
>>
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>>
>> It depends also which "action" is called on that dataframe.
>>
>>
>>
>> Here is the code for reproducing the pb:
>>
>>
>>
>> 
>>
>> object DataFrameSerDeTest extends App {
>>
>>
>>
>>   class A(val value: Int) // It is not serializable
>>
>>
>>
>>   def run() = {
>>
>> val spark = SparkSession
>>
>>   .builder()
>>
>>   .appName("DataFrameSerDeTest")
>>
>>   .master("local[*]")
>>
>>   .getOrCreate()
>>
>>
>>
>> import org.apache.spark.sql.functions.udf
>>
>> import spark.sqlContext.implicits._
>>
>>
>>
>> val notSer = new A(2)
>>
>> val add = udf {
>>
>>   (a: Int) => a + notSer.value
>>
>> }
>>
>> val df = spark.createDataFrame(Seq(
>>
>>   (1, 2),
>>
>>   (2, 2),
>>
>>   (3, 2),
>>
>>   (4, 2)
>>
>> )).toDF("key", "value")
>>
>>   .select($"key", add($"value").as("added"))
>>
>>
>>
>> df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>>
>>
>> df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>
>>   }
>>
>>
>>
>>   run()
>>
>> }
>>
>> 
>>
>>
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>>
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>>
>>
>> Any ideas ? Or I just messed things up ?
>>
>> Any help is highly appreciated.
>>
>>
>>
>> --
>>
>> Hao Ren
>>
>>
>>
>> Data Engineer @ leboncoin
>>
>>
>>
>> Paris, France
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Hao Ren
>>
>>
>>
>> Data Engineer @ leboncoin
>>
>>
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: zip for pyspark

2016-08-08 Thread Ewan Leith
If you build a normal python egg file with the dependencies, you can execute 
that like you are executing a .py file with  --py-files

Thanks,
Ewan

On 8 Aug 2016 3:44 p.m., pseudo oduesp  wrote:
hi,
how i can export all project on pyspark like zip   from local session to 
cluster and deploy with spark submit  i mean i have a large project with all 
dependances and i want create zip containing all of dependecs and deploy it on 
cluster



Re: Symbol HasInputCol is inaccesible from this place

2016-08-08 Thread janardhan shetty
Can some experts shed light on this one? Still facing issues with extends
HasInputCol and DefaultParamsWritable

On Mon, Aug 8, 2016 at 9:56 AM, janardhan shetty 
wrote:

> you mean is it deprecated ?
>
> On Mon, Aug 8, 2016 at 5:02 AM, Strange, Nick 
> wrote:
>
>> What possible reason do they have to think its fragmentation?
>>
>>
>>
>> *From:* janardhan shetty [mailto:janardhan...@gmail.com]
>> *Sent:* Saturday, August 06, 2016 2:01 PM
>> *To:* Ted Yu
>> *Cc:* user
>> *Subject:* Re: Symbol HasInputCol is inaccesible from this place
>>
>>
>>
>> Yes seems like, wondering if this can be made public in order to develop
>> custom transformers or any other alternatives ?
>>
>>
>>
>> On Sat, Aug 6, 2016 at 10:07 AM, Ted Yu  wrote:
>>
>> Is it because HasInputCol is private ?
>>
>>
>>
>> private[ml] trait HasInputCol extends Params {
>>
>>
>>
>> On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
>> wrote:
>>
>> Version : 2.0.0-preview
>>
>>
>> import org.apache.spark.ml.param._
>> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>>
>>
>> class CustomTransformer(override val uid: String) extends Transformer
>> with HasInputCol with HasOutputCol with DefaultParamsWritableimport
>> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>> HasInputCol, HasOutputCol}
>>
>> *Error in IntelliJ *
>> Symbol HasInputCol is inaccessible from this place
>>
>>  similairly for HasOutputCol and DefaultParamsWritable
>>
>> Any thoughts on this error as it is not allowing the compile
>>
>>
>>
>>
>>
>>
>>
>
>


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Reynold Xin
That is unfortunately the way how Scala compiler captures (and defines)
closures. Nothing is really final in the JVM. You can always use reflection
or unsafe to modify the value of fields.

On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott 
wrote:

> But does the “notSer” object have to be serialized?
>
>
>
> The object is immutable by the definition of A, so the only thing that
> needs to be serialized is the (immutable) Int value? And Ints are
> serializable?
>
>
>
> Just thinking out loud
>
>
>
> Simon Scott
>
>
>
> Research Developer @ viavisolutions.com
>
>
>
> *From:* Hao Ren [mailto:inv...@gmail.com]
> *Sent:* 08 August 2016 09:03
> *To:* Muthu Jayakumar 
> *Cc:* user ; dev 
> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
> does not work as expected
>
>
>
> Yes, it is.
>
> You can define a udf like that.
>
> Basically, it's a udf Int => Int which is a closure contains a non
> serializable object.
>
> The latter should cause Task not serializable exception.
>
>
>
> Hao
>
>
>
> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar 
> wrote:
>
> Hello Hao Ren,
>
>
>
> Doesn't the code...
>
>
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> Mean UDF function that Int => Int ?
>
>
>
> Thanks,
>
> Muthu
>
>
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>
> I am playing with spark 2.0
>
> What I tried to test is:
>
>
>
> Create a UDF in which there is a non serializable object.
>
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
>
> It depends also which "action" is called on that dataframe.
>
>
>
> Here is the code for reproducing the pb:
>
>
>
> 
>
> object DataFrameSerDeTest extends App {
>
>
>
>   class A(val value: Int) // It is not serializable
>
>
>
>   def run() = {
>
> val spark = SparkSession
>
>   .builder()
>
>   .appName("DataFrameSerDeTest")
>
>   .master("local[*]")
>
>   .getOrCreate()
>
>
>
> import org.apache.spark.sql.functions.udf
>
> import spark.sqlContext.implicits._
>
>
>
> val notSer = new A(2)
>
> val add = udf {
>
>   (a: Int) => a + notSer.value
>
> }
>
> val df = spark.createDataFrame(Seq(
>
>   (1, 2),
>
>   (2, 2),
>
>   (3, 2),
>
>   (4, 2)
>
> )).toDF("key", "value")
>
>   .select($"key", add($"value").as("added"))
>
>
>
> df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
>
>
> df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
>
>   }
>
>
>
>   run()
>
> }
>
> 
>
>
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
>
> It seems only filter() throws the exception. (feature or bug ?)
>
>
>
> Any ideas ? Or I just messed things up ?
>
> Any help is highly appreciated.
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>
>
>
>
>
>
>
> --
>
> Hao Ren
>
>
>
> Data Engineer @ leboncoin
>
>
>
> Paris, France
>


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-08 Thread Ted Yu
Mind showing the complete stack trace ?

Thanks

On Mon, Aug 8, 2016 at 12:30 PM, max square  wrote:

> Thanks Ted for the prompt reply.
>
> There are three or four DFs that are coming from various sources and I'm
> doing a unionAll on them.
>
> val placesProcessed = placesUnchanged.unionAll(placesAddedWithMerchantId).
> unionAll(placesUpdatedFromHotelsWithMerchantId).unionAll(pla
> cesUpdatedFromRestaurantsWithMerchantId).unionAll(placesChanged)
>
> I'm using Spark 1.6.2.
>
> On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:
>
>> Can you show the code snippet for unionAll operation ?
>>
>> Which Spark release do you use ?
>>
>> BTW please use user@spark.apache.org in the future.
>>
>> On Mon, Aug 8, 2016 at 11:47 AM, max square 
>> wrote:
>>
>>> Hey guys,
>>>
>>> I'm trying to save Dataframe in CSV format after performing unionAll
>>> operations on it.
>>> But I get this exception -
>>>
>>> Exception in thread "main" org.apache.spark.sql.catalyst.
>>> errors.package$TreeNodeException: execute, tree:
>>> TungstenExchange hashpartitioning(mId#430,200)
>>>
>>> I'm saving it by
>>>
>>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
>>> + latest)
>>>
>>> It works perfectly if I don't do the unionAll operation.
>>> I see that the format isn't different by printing the part of the
>>> results.
>>>
>>> Any help regarding this would be appreciated.
>>>
>>>
>>
>


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-08 Thread max square
Thanks Ted for the prompt reply.

There are three or four DFs that are coming from various sources and I'm
doing a unionAll on them.

val placesProcessed = placesUnchanged.unionAll(placesAddedWithMerchantId
).unionAll(placesUpdatedFromHotelsWithMerchantId).unionAll(
placesUpdatedFromRestaurantsWithMerchantId).unionAll(placesChanged)

I'm using Spark 1.6.2.

On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:

> Can you show the code snippet for unionAll operation ?
>
> Which Spark release do you use ?
>
> BTW please use user@spark.apache.org in the future.
>
> On Mon, Aug 8, 2016 at 11:47 AM, max square 
> wrote:
>
>> Hey guys,
>>
>> I'm trying to save Dataframe in CSV format after performing unionAll
>> operations on it.
>> But I get this exception -
>>
>> Exception in thread "main" org.apache.spark.sql.catalyst.
>> errors.package$TreeNodeException: execute, tree:
>> TungstenExchange hashpartitioning(mId#430,200)
>>
>> I'm saving it by
>>
>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
>> latest)
>>
>> It works perfectly if I don't do the unionAll operation.
>> I see that the format isn't different by printing the part of the
>> results.
>>
>> Any help regarding this would be appreciated.
>>
>>
>


Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread Mich Talebzadeh
Unfortunately it is the case for now

spark-sql> show create table payees;
Error in query: Failed to execute SHOW CREATE TABLE against table
`accounts`.`payees`, which is created by Hive and uses the following
unsupported feature(s)
 - bucketing;

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 August 2016 at 20:12, manish jaiswal  wrote:

> correct its creating delta file in hdfs.but after compaction it merge all
> data and create extra directory where all bucketed data present.( i am able
> to read data from hive but not from sparksql).
>


Re: Spark join and large temp files

2016-08-08 Thread Yong Zhang
Join requires shuffling. The problem is that you have to shuffle 1.5T data, 
which caused problem on your disk usage. Another way is to broadcast the 1.5G 
small dataset, so there is no shuffle requirement for 1.5T dataset. But you 
need to make sure you have enough memory.


Can you try to increase your partition count, which will make each partition 
contains less data for your 1.5T, so the whole disk usage of split data maybe 
less.


But keep in mind you should always have enough space of your disk to handle the 
job you plan to run.


Yong



From: Ashic Mahtab 
Sent: Monday, August 8, 2016 2:53 PM
To: Deepak Sharma
Cc: Apache Spark
Subject: RE: Spark join and large temp files

Hi Deepak,
Thanks for the response.

Registering the temp tables didn't help. Here's what I have:

val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")
val b = sqlContext.read.parquet(...).select("id", "number")

a.registerTempTable("a")
b.registerTempTable("b")

val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)

results.write.parquet(...)

Is there something I'm missing?

Cheers,
Ashic.


From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp table.
This should resolve your issue.

Thanks
Deepak

On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab 
mailto:as...@live.com>> wrote:
Hello,
We have two parquet inputs of the following form:

a: id:String, Name:String  (1.5TB)
b: id:String, Number:Int  (1.3GB)

We need to join these two to get (id, Number, Name). We've tried two approaches:

a.join(b, Seq("id"), "right_outer")

where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?

Note, the ids are unique, and there's a one to one mapping between the two 
datasets.

Any help would be appreciated.

-Ashic.







--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread manish jaiswal
correct its creating delta file in hdfs.but after compaction it merge all
data and create extra directory where all bucketed data present.( i am able
to read data from hive but not from sparksql).


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-08 Thread Ted Yu
Can you show the code snippet for unionAll operation ?

Which Spark release do you use ?

BTW please use user@spark.apache.org in the future.

On Mon, Aug 8, 2016 at 11:47 AM, max square  wrote:

> Hey guys,
>
> I'm trying to save Dataframe in CSV format after performing unionAll
> operations on it.
> But I get this exception -
>
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
> TungstenExchange hashpartitioning(mId#430,200)
>
> I'm saving it by
>
> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
> latest)
>
> It works perfectly if I don't do the unionAll operation.
> I see that the format isn't different by printing the part of the results.
>
> Any help regarding this would be appreciated.
>
>


Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread manish jaiswal
i am using spark 1.6.0 and hive 1.2.1.

reading from hive transactional table is not supported yet by sparl sql?

On Tue, Aug 9, 2016 at 12:18 AM, manish jaiswal 
wrote:

> Hi,
>
> I am not able to read data from hive transactional table using sparksql.
> (i don't want read via hive jdbc)
>
>
>
> Please help.
>


Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread Mich Talebzadeh
I suspect this is happening because the underlying table has got delta
files in it due to updates etc and spark cannot read it and requires
compaction

Can you do

hdfs dfs -ls  


Also can you query a normal table in hive (meaning non transactional)

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 August 2016 at 19:51, Deepak Sharma  wrote:

> Can you please post the code snippet and the error you are getting ?
>
> -Deepak
>
> On 9 Aug 2016 12:18 am, "manish jaiswal"  wrote:
>
>> Hi,
>>
>> I am not able to read data from hive transactional table using sparksql.
>> (i don't want read via hive jdbc)
>>
>>
>>
>> Please help.
>>
>


RE: Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab  wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread Deepak Sharma
Can you please post the code snippet and the error you are getting ?

-Deepak

On 9 Aug 2016 12:18 am, "manish jaiswal"  wrote:

> Hi,
>
> I am not able to read data from hive transactional table using sparksql.
> (i don't want read via hive jdbc)
>
>
>
> Please help.
>


Re: SPARK SQL READING FROM HIVE

2016-08-08 Thread Mich Talebzadeh
Which version of Spark and Hive are you using?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 August 2016 at 19:48, manish jaiswal  wrote:

> Hi,
>
> I am not able to read data from hive transactional table using sparksql.
> (i don't want read via hive jdbc)
>
>
>
> Please help.
>


SPARK SQL READING FROM HIVE

2016-08-08 Thread manish jaiswal
Hi,

I am not able to read data from hive transactional table using sparksql.
(i don't want read via hive jdbc)



Please help.


Getting a TreeNode Exception while saving into Hadoop

2016-08-08 Thread max square
Hey guys,

I'm trying to save Dataframe in CSV format after performing unionAll
operations on it.
But I get this exception -

Exception in thread "main"
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
TungstenExchange hashpartitioning(mId#430,200)

I'm saving it by

df.write.format("com.databricks.spark.csv").options(Map("mode" ->
"DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
latest)

It works perfectly if I don't do the unionAll operation.
I see that the format isn't different by printing the part of the results.

Any help regarding this would be appreciated.


Re: Have I done everything correctly when subscribing to Spark User List

2016-08-08 Thread Sivakumaran S
Does it have anything to do with the fact that the mail address is displayed as 
user @spark.apache.org ? There is a space before ‘@‘. 
This is as received in my mail client.

Sivakumaran


> On 08-Aug-2016, at 7:42 PM, Chris Mattmann  wrote:
> 
> Weird!
> 
> 
> 
> 
> 
> On 8/8/16, 11:10 AM, "Sean Owen"  wrote:
> 
>> I also don't know what's going on with the "This post has NOT been
>> accepted by the mailing list yet" message, because actually the
>> messages always do post. In fact this has been sent to the list 4
>> times:
>> 
>> https://www.mail-archive.com/search?l=user%40spark.apache.org&q=dueckm&submit.x=0&submit.y=0
>> 
>> On Mon, Aug 8, 2016 at 3:03 PM, Chris Mattmann  wrote:
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On 8/8/16, 2:03 AM, "matthias.du...@fiduciagad.de" 
>>>  wrote:
>>> 
 Hello,
 
 I write to you because I am not really sure whether I did everything right 
 when registering and subscribing to the spark user list.
 
 I posted the appended question to Spark User list after subscribing and 
 receiving the "WELCOME to user@spark.apache.org" mail from 
 "user-h...@spark.apache.org".
 But this post is still in state "This post has NOT been accepted by the 
 mailing list yet.".
 
 Is this because I forgot something to do or did something wrong with my 
 user account (dueckm)? Or is it because no member of the Spark User List 
 reacted to that post yet?
 
 Thanks a lot for yout help.
 
 Matthias
 
 Fiducia & GAD IT AG | www.fiduciagad.de
 AG Frankfurt a. M. HRB 102381 | Sitz der Gesellschaft: Hahnstr. 48, 60528 
 Frankfurt a. M. | USt-IdNr. DE 143582320
 Vorstand: Klaus-Peter Bruns (Vorsitzender), Claus-Dieter Toben (stv. 
 Vorsitzender),
 
 Jens-Olaf Bartels, Martin Beyer, Jörg Dreinhöfer, Wolfgang Eckert, Carsten 
 Pfläging, Jörg Staff
 Vorsitzender des Aufsichtsrats: Jürgen Brinkmann
 
 - Weitergeleitet von Matthias Dück/M/FAG/FIDUCIA/DE am 08.08.2016 
 10:57 -
 
 Von: dueckm 
 An: user@spark.apache.org
 Datum: 04.08.2016 13:27
 Betreff: Are join/groupBy operations with wide Java Beans using Dataset 
 API much slower than using RDD API?
 
 
 
 
 
 Hello,
 
 I built a prototype that uses join and groupBy operations via Spark RDD 
 API.
 Recently I migrated it to the Dataset API. Now it runs much slower than 
 with
 the original RDD implementation.
 Did I do something wrong here? Or is this a price I have to pay for the 
 more
 convienient API?
 Is there a known solution to deal with this effect (eg configuration via
 "spark.sql.shuffle.partitions" - but now could I determine the correct
 value)?
 In my prototype I use Java Beans with a lot of attributes. Does this slow
 down Spark-operations with Datasets?
 
 Here I have an simple example, that shows the difference:
 JoinGroupByTest.zip
 
 - I build 2 RDDs and join and group them. Afterwards I count and display 
 the
 joined RDDs.  (Method de.testrddds.JoinGroupByTest.joinAndGroupViaRDD() )
 - When I do the same actions with Datasets it takes approximately 40 times
 as long (Methodd e.testrddds.JoinGroupByTest.joinAndGroupViaDatasets()).
 
 Thank you very much for your help.
 Matthias
 
 PS1: excuse me for sending this post more than once, but I am new to this
 mailing list and probably did something wrong when registering/subscribing,
 so my previous postings have not been accepted ...
 
 PS2: See the appended screenshots taken from Spark UI (jobs 0/1 belong to
 RDD implementation, jobs 2/3 to Dataset):
 
 
 
 
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Are-join-groupBy-operations-with-wide-Java-Beans-using-Dataset-API-much-slower-than-using-RDD-API-tp27473.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Have I done everything correctly when subscribing to Spark User List

2016-08-08 Thread Chris Mattmann
Weird!





On 8/8/16, 11:10 AM, "Sean Owen"  wrote:

>I also don't know what's going on with the "This post has NOT been
>accepted by the mailing list yet" message, because actually the
>messages always do post. In fact this has been sent to the list 4
>times:
>
>https://www.mail-archive.com/search?l=user%40spark.apache.org&q=dueckm&submit.x=0&submit.y=0
>
>On Mon, Aug 8, 2016 at 3:03 PM, Chris Mattmann  wrote:
>>
>>
>>
>>
>>
>> On 8/8/16, 2:03 AM, "matthias.du...@fiduciagad.de" 
>>  wrote:
>>
>>>Hello,
>>>
>>>I write to you because I am not really sure whether I did everything right 
>>>when registering and subscribing to the spark user list.
>>>
>>>I posted the appended question to Spark User list after subscribing and 
>>>receiving the "WELCOME to user@spark.apache.org" mail from 
>>>"user-h...@spark.apache.org".
>>> But this post is still in state "This post has NOT been accepted by the 
>>> mailing list yet.".
>>>
>>>Is this because I forgot something to do or did something wrong with my user 
>>>account (dueckm)? Or is it because no member of the Spark User List reacted 
>>>to that post yet?
>>>
>>>Thanks a lot for yout help.
>>>
>>>Matthias
>>>
>>>Fiducia & GAD IT AG | www.fiduciagad.de
>>>AG Frankfurt a. M. HRB 102381 | Sitz der Gesellschaft: Hahnstr. 48, 60528 
>>>Frankfurt a. M. | USt-IdNr. DE 143582320
>>>Vorstand: Klaus-Peter Bruns (Vorsitzender), Claus-Dieter Toben (stv. 
>>>Vorsitzender),
>>>
>>>Jens-Olaf Bartels, Martin Beyer, Jörg Dreinhöfer, Wolfgang Eckert, Carsten 
>>>Pfläging, Jörg Staff
>>>Vorsitzender des Aufsichtsrats: Jürgen Brinkmann
>>>
>>>- Weitergeleitet von Matthias Dück/M/FAG/FIDUCIA/DE am 08.08.2016 10:57 
>>>-
>>>
>>>Von: dueckm 
>>>An: user@spark.apache.org
>>>Datum: 04.08.2016 13:27
>>>Betreff: Are join/groupBy operations with wide Java Beans using Dataset API 
>>>much slower than using RDD API?
>>>
>>>
>>>
>>>
>>>
>>>Hello,
>>>
>>>I built a prototype that uses join and groupBy operations via Spark RDD API.
>>>Recently I migrated it to the Dataset API. Now it runs much slower than with
>>>the original RDD implementation.
>>>Did I do something wrong here? Or is this a price I have to pay for the more
>>>convienient API?
>>>Is there a known solution to deal with this effect (eg configuration via
>>>"spark.sql.shuffle.partitions" - but now could I determine the correct
>>>value)?
>>>In my prototype I use Java Beans with a lot of attributes. Does this slow
>>>down Spark-operations with Datasets?
>>>
>>>Here I have an simple example, that shows the difference:
>>>JoinGroupByTest.zip
>>>
>>>- I build 2 RDDs and join and group them. Afterwards I count and display the
>>>joined RDDs.  (Method de.testrddds.JoinGroupByTest.joinAndGroupViaRDD() )
>>>- When I do the same actions with Datasets it takes approximately 40 times
>>>as long (Methodd e.testrddds.JoinGroupByTest.joinAndGroupViaDatasets()).
>>>
>>>Thank you very much for your help.
>>>Matthias
>>>
>>>PS1: excuse me for sending this post more than once, but I am new to this
>>>mailing list and probably did something wrong when registering/subscribing,
>>>so my previous postings have not been accepted ...
>>>
>>>PS2: See the appended screenshots taken from Spark UI (jobs 0/1 belong to
>>>RDD implementation, jobs 2/3 to Dataset):
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>--
>>>View this message in context: 
>>>http://apache-spark-user-list.1001560.n3.nabble.com/Are-join-groupBy-operations-with-wide-Java-Beans-using-Dataset-API-much-slower-than-using-RDD-API-tp27473.html
>>>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>>-
>>>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark join and large temp files

2016-08-08 Thread Deepak Sharma
Register you dataframes as temp tables and then try the join on the temp
table.
This should resolve your issue.

Thanks
Deepak

On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab  wrote:

> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Have I done everything correctly when subscribing to Spark User List

2016-08-08 Thread Ovidiu-Cristian MARCU
Probably the yellow warning message can be confusing even more than not 
receiving an answer/opinion on his post.

Best,
Ovidiu
> On 08 Aug 2016, at 20:10, Sean Owen  wrote:
> 
> I also don't know what's going on with the "This post has NOT been
> accepted by the mailing list yet" message, because actually the
> messages always do post. In fact this has been sent to the list 4
> times:
> 
> https://www.mail-archive.com/search?l=user%40spark.apache.org&q=dueckm&submit.x=0&submit.y=0
> 
> On Mon, Aug 8, 2016 at 3:03 PM, Chris Mattmann  wrote:
>> 
>> 
>> 
>> 
>> 
>> On 8/8/16, 2:03 AM, "matthias.du...@fiduciagad.de" 
>>  wrote:
>> 
>>> Hello,
>>> 
>>> I write to you because I am not really sure whether I did everything right 
>>> when registering and subscribing to the spark user list.
>>> 
>>> I posted the appended question to Spark User list after subscribing and 
>>> receiving the "WELCOME to user@spark.apache.org" mail from 
>>> "user-h...@spark.apache.org".
>>> But this post is still in state "This post has NOT been accepted by the 
>>> mailing list yet.".
>>> 
>>> Is this because I forgot something to do or did something wrong with my 
>>> user account (dueckm)? Or is it because no member of the Spark User List 
>>> reacted to that post yet?
>>> 
>>> Thanks a lot for yout help.
>>> 
>>> Matthias
>>> 
>>> Fiducia & GAD IT AG | www.fiduciagad.de
>>> AG Frankfurt a. M. HRB 102381 | Sitz der Gesellschaft: Hahnstr. 48, 60528 
>>> Frankfurt a. M. | USt-IdNr. DE 143582320
>>> Vorstand: Klaus-Peter Bruns (Vorsitzender), Claus-Dieter Toben (stv. 
>>> Vorsitzender),
>>> 
>>> Jens-Olaf Bartels, Martin Beyer, Jörg Dreinhöfer, Wolfgang Eckert, Carsten 
>>> Pfläging, Jörg Staff
>>> Vorsitzender des Aufsichtsrats: Jürgen Brinkmann
>>> 
>>> - Weitergeleitet von Matthias Dück/M/FAG/FIDUCIA/DE am 08.08.2016 10:57 
>>> -
>>> 
>>> Von: dueckm 
>>> An: user@spark.apache.org
>>> Datum: 04.08.2016 13:27
>>> Betreff: Are join/groupBy operations with wide Java Beans using Dataset API 
>>> much slower than using RDD API?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Hello,
>>> 
>>> I built a prototype that uses join and groupBy operations via Spark RDD API.
>>> Recently I migrated it to the Dataset API. Now it runs much slower than with
>>> the original RDD implementation.
>>> Did I do something wrong here? Or is this a price I have to pay for the more
>>> convienient API?
>>> Is there a known solution to deal with this effect (eg configuration via
>>> "spark.sql.shuffle.partitions" - but now could I determine the correct
>>> value)?
>>> In my prototype I use Java Beans with a lot of attributes. Does this slow
>>> down Spark-operations with Datasets?
>>> 
>>> Here I have an simple example, that shows the difference:
>>> JoinGroupByTest.zip
>>> 
>>> - I build 2 RDDs and join and group them. Afterwards I count and display the
>>> joined RDDs.  (Method de.testrddds.JoinGroupByTest.joinAndGroupViaRDD() )
>>> - When I do the same actions with Datasets it takes approximately 40 times
>>> as long (Methodd e.testrddds.JoinGroupByTest.joinAndGroupViaDatasets()).
>>> 
>>> Thank you very much for your help.
>>> Matthias
>>> 
>>> PS1: excuse me for sending this post more than once, but I am new to this
>>> mailing list and probably did something wrong when registering/subscribing,
>>> so my previous postings have not been accepted ...
>>> 
>>> PS2: See the appended screenshots taken from Spark UI (jobs 0/1 belong to
>>> RDD implementation, jobs 2/3 to Dataset):
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Are-join-groupBy-operations-with-wide-Java-Beans-using-Dataset-API-much-slower-than-using-RDD-API-tp27473.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

Re: FW: Have I done everything correctly when subscribing to Spark User List

2016-08-08 Thread Sean Owen
I also don't know what's going on with the "This post has NOT been
accepted by the mailing list yet" message, because actually the
messages always do post. In fact this has been sent to the list 4
times:

https://www.mail-archive.com/search?l=user%40spark.apache.org&q=dueckm&submit.x=0&submit.y=0

On Mon, Aug 8, 2016 at 3:03 PM, Chris Mattmann  wrote:
>
>
>
>
>
> On 8/8/16, 2:03 AM, "matthias.du...@fiduciagad.de" 
>  wrote:
>
>>Hello,
>>
>>I write to you because I am not really sure whether I did everything right 
>>when registering and subscribing to the spark user list.
>>
>>I posted the appended question to Spark User list after subscribing and 
>>receiving the "WELCOME to user@spark.apache.org" mail from 
>>"user-h...@spark.apache.org".
>> But this post is still in state "This post has NOT been accepted by the 
>> mailing list yet.".
>>
>>Is this because I forgot something to do or did something wrong with my user 
>>account (dueckm)? Or is it because no member of the Spark User List reacted 
>>to that post yet?
>>
>>Thanks a lot for yout help.
>>
>>Matthias
>>
>>Fiducia & GAD IT AG | www.fiduciagad.de
>>AG Frankfurt a. M. HRB 102381 | Sitz der Gesellschaft: Hahnstr. 48, 60528 
>>Frankfurt a. M. | USt-IdNr. DE 143582320
>>Vorstand: Klaus-Peter Bruns (Vorsitzender), Claus-Dieter Toben (stv. 
>>Vorsitzender),
>>
>>Jens-Olaf Bartels, Martin Beyer, Jörg Dreinhöfer, Wolfgang Eckert, Carsten 
>>Pfläging, Jörg Staff
>>Vorsitzender des Aufsichtsrats: Jürgen Brinkmann
>>
>>- Weitergeleitet von Matthias Dück/M/FAG/FIDUCIA/DE am 08.08.2016 10:57 
>>-
>>
>>Von: dueckm 
>>An: user@spark.apache.org
>>Datum: 04.08.2016 13:27
>>Betreff: Are join/groupBy operations with wide Java Beans using Dataset API 
>>much slower than using RDD API?
>>
>>
>>
>>
>>
>>Hello,
>>
>>I built a prototype that uses join and groupBy operations via Spark RDD API.
>>Recently I migrated it to the Dataset API. Now it runs much slower than with
>>the original RDD implementation.
>>Did I do something wrong here? Or is this a price I have to pay for the more
>>convienient API?
>>Is there a known solution to deal with this effect (eg configuration via
>>"spark.sql.shuffle.partitions" - but now could I determine the correct
>>value)?
>>In my prototype I use Java Beans with a lot of attributes. Does this slow
>>down Spark-operations with Datasets?
>>
>>Here I have an simple example, that shows the difference:
>>JoinGroupByTest.zip
>>
>>- I build 2 RDDs and join and group them. Afterwards I count and display the
>>joined RDDs.  (Method de.testrddds.JoinGroupByTest.joinAndGroupViaRDD() )
>>- When I do the same actions with Datasets it takes approximately 40 times
>>as long (Methodd e.testrddds.JoinGroupByTest.joinAndGroupViaDatasets()).
>>
>>Thank you very much for your help.
>>Matthias
>>
>>PS1: excuse me for sending this post more than once, but I am new to this
>>mailing list and probably did something wrong when registering/subscribing,
>>so my previous postings have not been accepted ...
>>
>>PS2: See the appended screenshots taken from Spark UI (jobs 0/1 belong to
>>RDD implementation, jobs 2/3 to Dataset):
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>--
>>View this message in context: 
>>http://apache-spark-user-list.1001560.n3.nabble.com/Are-join-groupBy-operations-with-wide-Java-Beans-using-Dataset-API-much-slower-than-using-RDD-API-tp27473.html
>>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>-
>>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Sean Owen
Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
really want to select features or just obtain a lower-dimensional
representation of them, with less redundancy?

On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane  wrote:
> There must be an algorithmic way to figure out which of these factors
> contribute the least and remove them in the analysis.
> I am hoping same one can throw some insight on this.
>
> On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S  wrote:
>>
>> Not an expert here, but the first step would be devote some time and
>> identify which of these 112 factors are actually causative. Some domain
>> knowledge of the data may be required. Then, you can start of with PCA.
>>
>> HTH,
>>
>> Regards,
>>
>> Sivakumaran S
>>
>> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>>
>> Great question Rohit.  I am in my early days of ML as well and it would be
>> great if we get some idea on this from other experts on this group.
>>
>> I know we can reduce dimensions by using PCA, but i think that does not
>> allow us to understand which factors from the original are we using in the
>> end.
>>
>> - Tony L.
>>
>> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha 
>> wrote:
>>>
>>>
>>> I have a data-set where each data-point has 112 factors.
>>>
>>> I want to remove the factors which are not relevant, and say reduce to 20
>>> factors out of these 112 and then do clustering of data-points using these
>>> 20 factors.
>>>
>>> How do I do these and how do I figure out which of the 20 factors are
>>> useful for analysis.
>>>
>>> I see SVD and PCA implementations, but I am not sure if these give which
>>> elements are removed and which are remaining.
>>>
>>> Can someone please help me understand what to do here
>>>
>>> thanks,
>>> -Rohit
>>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



using matrix as column datatype in SparkSQL Dataframe

2016-08-08 Thread Vadla, Karthik
Hello all,


I'm trying to load set of medical images(dicom) into spark SQL dataframe. Here 
each image is loaded into matrix column of dataframe. I see spark recently 
added MatrixUDT to support this kind of cases, but i don't find a sample for 
using matrix as column in dataframe.

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala

Can anyone help me with this.

Really appreciate your help.

Thanks

Karthik Vadla



Re: Source format for Apache Spark logo

2016-08-08 Thread Sean Owen
In case the attachments don't come through, BTW those are indeed
downloadable from the directory http://spark.apache.org/images/

On Mon, Aug 8, 2016 at 6:09 PM, Sivakumaran S  wrote:
> Found these from the spark.apache.org website.
>
> HTH,
>
> Sivakumaran S
>
>
>
>
>
> On 08-Aug-2016, at 5:24 PM, michael.ar...@gdata-adan.de wrote:
>
> Hi,
>
> for a presentation I’d apreciate a vector version of the Apache Spark logo,
> unfortunately I cannot find it. Is the Logo available in a vector format
> somewhere?
>  s
>
>
> 
> Virus checked by G Data MailSecurity
> Version: AVA 25.7800 dated 08.08.2016
> Virus news: www.antiviruslab.com
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Symbol HasInputCol is inaccesible from this place

2016-08-08 Thread janardhan shetty
you mean is it deprecated ?

On Mon, Aug 8, 2016 at 5:02 AM, Strange, Nick  wrote:

> What possible reason do they have to think its fragmentation?
>
>
>
> *From:* janardhan shetty [mailto:janardhan...@gmail.com]
> *Sent:* Saturday, August 06, 2016 2:01 PM
> *To:* Ted Yu
> *Cc:* user
> *Subject:* Re: Symbol HasInputCol is inaccesible from this place
>
>
>
> Yes seems like, wondering if this can be made public in order to develop
> custom transformers or any other alternatives ?
>
>
>
> On Sat, Aug 6, 2016 at 10:07 AM, Ted Yu  wrote:
>
> Is it because HasInputCol is private ?
>
>
>
> private[ml] trait HasInputCol extends Params {
>
>
>
> On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
> wrote:
>
> Version : 2.0.0-preview
>
>
> import org.apache.spark.ml.param._
> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>
>
> class CustomTransformer(override val uid: String) extends Transformer with
> HasInputCol with HasOutputCol with DefaultParamsWritableimport
> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
> HasInputCol, HasOutputCol}
>
> *Error in IntelliJ *
> Symbol HasInputCol is inaccessible from this place
>
>  similairly for HasOutputCol and DefaultParamsWritable
>
> Any thoughts on this error as it is not allowing the compile
>
>
>
>
>
>
>


Source format for Apache Spark logo

2016-08-08 Thread Michael.Arndt
Hi,

for a presentation I'd apreciate a vector version of the Apache Spark logo, 
unfortunately I cannot find it. Is the Logo available in a vector format 
somewhere?



Virus checked by G Data MailSecurity
Version: AVA 25.7800 dated 08.08.2016
Virus news: www.antiviruslab.com

Re: Best practises around spark-scala

2016-08-08 Thread Deepak Sharma
Thanks Vaquar.
My intention is to find something which can help stress test the code in
spark , measure the performance and suggest some improvements.
Is there any such framework or tool I can use here ?

Thanks
Deepak

On 8 Aug 2016 9:14 pm, "vaquar khan"  wrote:

> I found following links are good as I am using same.
>
> http://spark.apache.org/docs/latest/tuning.html
>
> https://spark-summit.org/2014/testing-spark-best-practices/
>
> Regards,
> Vaquar khan
>
> On 8 Aug 2016 10:11, "Deepak Sharma"  wrote:
>
>> Hi All,
>> Can anyone please give any documents that may be there around spark-scala
>> best practises?
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>


Re: Best practises around spark-scala

2016-08-08 Thread vaquar khan
I found following links are good as I am using same.

http://spark.apache.org/docs/latest/tuning.html

https://spark-summit.org/2014/testing-spark-best-practices/

Regards,
Vaquar khan

On 8 Aug 2016 10:11, "Deepak Sharma"  wrote:

> Hi All,
> Can anyone please give any documents that may be there around spark-scala
> best practises?
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Best practises around spark-scala

2016-08-08 Thread Deepak Sharma
Hi All,
Can anyone please give any documents that may be there around spark-scala
best practises?

-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Tony Lane
There must be an algorithmic way to figure out which of these factors
contribute the least and remove them in the analysis.
I am hoping same one can throw some insight on this.

On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S  wrote:

> Not an expert here, but the first step would be devote some time and
> identify which of these 112 factors are actually causative. Some domain
> knowledge of the data may be required. Then, you can start of with PCA.
>
> HTH,
>
> Regards,
>
> Sivakumaran S
>
> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>
> Great question Rohit.  I am in my early days of ML as well and it would be
> great if we get some idea on this from other experts on this group.
>
> I know we can reduce dimensions by using PCA, but i think that does not
> allow us to understand which factors from the original are we using in the
> end.
>
> - Tony L.
>
> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha 
> wrote:
>
>>
>> I have a data-set where each data-point has 112 factors.
>>
>> I want to remove the factors which are not relevant, and say reduce to 20
>> factors out of these 112 and then do clustering of data-points using these
>> 20 factors.
>>
>> How do I do these and how do I figure out which of the 20 factors are
>> useful for analysis.
>>
>> I see SVD and PCA implementations, but I am not sure if these give which
>> elements are removed and which are remaining.
>>
>> Can someone please help me understand what to do here
>>
>> thanks,
>> -Rohit
>>
>>
>
>


Unsubscribe

2016-08-08 Thread bijuna


Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



zip for pyspark

2016-08-08 Thread pseudo oduesp
hi,
how i can export all project on pyspark like zip   from local session to
cluster and deploy with spark submit  i mean i have a large project with
all dependances and i want create zip containing all of dependecs and
deploy it on cluster


Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Sivakumaran S
Not an expert here, but the first step would be devote some time and identify 
which of these 112 factors are actually causative. Some domain knowledge of the 
data may be required. Then, you can start of with PCA. 

HTH,

Regards,

Sivakumaran S
> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
> 
> Great question Rohit.  I am in my early days of ML as well and it would be 
> great if we get some idea on this from other experts on this group. 
> 
> I know we can reduce dimensions by using PCA, but i think that does not allow 
> us to understand which factors from the original are we using in the end. 
> 
> - Tony L.
> 
> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha  > wrote:
> 
> I have a data-set where each data-point has 112 factors. 
> 
> I want to remove the factors which are not relevant, and say reduce to 20 
> factors out of these 112 and then do clustering of data-points using these 20 
> factors.
> 
> How do I do these and how do I figure out which of the 20 factors are useful 
> for analysis. 
> 
> I see SVD and PCA implementations, but I am not sure if these give which 
> elements are removed and which are remaining. 
> 
> Can someone please help me understand what to do here 
> 
> thanks,
> -Rohit 
> 
> 



FW: Have I done everything correctly when subscribing to Spark User List

2016-08-08 Thread Chris Mattmann





On 8/8/16, 2:03 AM, "matthias.du...@fiduciagad.de" 
 wrote:

>Hello,
>
>I write to you because I am not really sure whether I did everything right 
>when registering and subscribing to the spark user list.
>
>I posted the appended question to Spark User list after subscribing and 
>receiving the "WELCOME to user@spark.apache.org" mail from 
>"user-h...@spark.apache.org".
> But this post is still in state "This post has NOT been accepted by the 
> mailing list yet.".
>
>Is this because I forgot something to do or did something wrong with my user 
>account (dueckm)? Or is it because no member of the Spark User List reacted to 
>that post yet?
>
>Thanks a lot for yout help.
>
>Matthias
>
>Fiducia & GAD IT AG | www.fiduciagad.de
>AG Frankfurt a. M. HRB 102381 | Sitz der Gesellschaft: Hahnstr. 48, 60528 
>Frankfurt a. M. | USt-IdNr. DE 143582320
>Vorstand: Klaus-Peter Bruns (Vorsitzender), Claus-Dieter Toben (stv. 
>Vorsitzender),
>
>Jens-Olaf Bartels, Martin Beyer, Jörg Dreinhöfer, Wolfgang Eckert, Carsten 
>Pfläging, Jörg Staff
>Vorsitzender des Aufsichtsrats: Jürgen Brinkmann
>
>- Weitergeleitet von Matthias Dück/M/FAG/FIDUCIA/DE am 08.08.2016 10:57 
>-
>
>Von: dueckm 
>An: user@spark.apache.org
>Datum: 04.08.2016 13:27
>Betreff: Are join/groupBy operations with wide Java Beans using Dataset API 
>much slower than using RDD API?
>
>
>
>
>
>Hello,
>
>I built a prototype that uses join and groupBy operations via Spark RDD API.
>Recently I migrated it to the Dataset API. Now it runs much slower than with
>the original RDD implementation. 
>Did I do something wrong here? Or is this a price I have to pay for the more
>convienient API?
>Is there a known solution to deal with this effect (eg configuration via
>"spark.sql.shuffle.partitions" - but now could I determine the correct
>value)?
>In my prototype I use Java Beans with a lot of attributes. Does this slow
>down Spark-operations with Datasets?
>
>Here I have an simple example, that shows the difference: 
>JoinGroupByTest.zip
>
>  
>- I build 2 RDDs and join and group them. Afterwards I count and display the
>joined RDDs.  (Method de.testrddds.JoinGroupByTest.joinAndGroupViaRDD() )
>- When I do the same actions with Datasets it takes approximately 40 times
>as long (Methodd e.testrddds.JoinGroupByTest.joinAndGroupViaDatasets()).
>
>Thank you very much for your help.
>Matthias
>
>PS1: excuse me for sending this post more than once, but I am new to this
>mailing list and probably did something wrong when registering/subscribing,
>so my previous postings have not been accepted ...
>
>PS2: See the appended screenshots taken from Spark UI (jobs 0/1 belong to
>RDD implementation, jobs 2/3 to Dataset):
>
>
>
>
>
>
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Are-join-groupBy-operations-with-wide-Java-Beans-using-Dataset-API-much-slower-than-using-RDD-API-tp27473.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Tony Lane
Great question Rohit.  I am in my early days of ML as well and it would be
great if we get some idea on this from other experts on this group.

I know we can reduce dimensions by using PCA, but i think that does not
allow us to understand which factors from the original are we using in the
end.

- Tony L.

On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha 
wrote:

>
> I have a data-set where each data-point has 112 factors.
>
> I want to remove the factors which are not relevant, and say reduce to 20
> factors out of these 112 and then do clustering of data-points using these
> 20 factors.
>
> How do I do these and how do I figure out which of the 20 factors are
> useful for analysis.
>
> I see SVD and PCA implementations, but I am not sure if these give which
> elements are removed and which are remaining.
>
> Can someone please help me understand what to do here
>
> thanks,
> -Rohit
>
>


Spark 2 and existing code with sqlContext

2016-08-08 Thread Mich Talebzadeh
Hi,

In Spark 1.6.1 this worked

scala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') ").collect.foreach(println)
[08/08/2016 14:07:22.22]

Spark 2 should give due to backward compatibility?

But I get

cala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') ").collect.foreach(println)
:24: error: not found: value sqlContext
   sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') ").collect.foreach(println)

Now we can change it to HiveContext and it works

However, what is the best solution if any as we have loads of sqlContext in
our code?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


RE: Symbol HasInputCol is inaccesible from this place

2016-08-08 Thread Strange, Nick
What possible reason do they have to think its fragmentation?

From: janardhan shetty [mailto:janardhan...@gmail.com]
Sent: Saturday, August 06, 2016 2:01 PM
To: Ted Yu
Cc: user
Subject: Re: Symbol HasInputCol is inaccesible from this place

Yes seems like, wondering if this can be made public in order to develop custom 
transformers or any other alternatives ?

On Sat, Aug 6, 2016 at 10:07 AM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
Is it because HasInputCol is private ?

private[ml] trait HasInputCol extends Params {

On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
mailto:janardhan...@gmail.com>> wrote:
Version : 2.0.0-preview

import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}


class CustomTransformer(override val uid: String) extends Transformer with 
HasInputCol with HasOutputCol with DefaultParamsWritableimport 
org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
HasInputCol, HasOutputCol}

Error in IntelliJ
Symbol HasInputCol is inaccessible from this place
 similairly for HasOutputCol and DefaultParamsWritable
Any thoughts on this error as it is not allowing the compile





Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-08 Thread Rohit Chaddha
I have a data-set where each data-point has 112 factors.

I want to remove the factors which are not relevant, and say reduce to 20
factors out of these 112 and then do clustering of data-points using these
20 factors.

How do I do these and how do I figure out which of the 20 factors are
useful for analysis.

I see SVD and PCA implementations, but I am not sure if these give which
elements are removed and which are remaining.

Can someone please help me understand what to do here

thanks,
-Rohit


Re: Multiple Sources Found for Parquet

2016-08-08 Thread Aseem Bansal
Seems that this is a common issue with Spark 2.0.0

I faced similar with CSV. Saw someone facing this with JSON.
https://issues.apache.org/jira/browse/SPARK-16893

On Mon, Aug 8, 2016 at 4:08 PM, Ted Yu  wrote:

> Can you examine classpath to see where *DefaultSource comes from ?*
>
> *Thanks*
>
> On Mon, Aug 8, 2016 at 2:34 AM, 金国栋  wrote:
>
>> I'm using Spark2.0.0 to do sql analysis over parquet files, when using
>> `read().parquet("path")`, or `write().parquet("path")` in Java(I followed
>> the example java file in source code exactly), I always encountered
>>
>> *Exception in thread "main" java.lang.RuntimeException: Multiple sources
>> found for parquet
>> (org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat,
>> org.apache.spark.sql.execution.datasources.parquet.DefaultSource), please
>> specify the fully qualified class name.*
>>
>> Any idea why?
>>
>> Thanks.
>>
>> Best,
>> Jelly
>>
>
>


Re: Multiple Sources Found for Parquet

2016-08-08 Thread Ted Yu
Can you examine classpath to see where *DefaultSource comes from ?*

*Thanks*

On Mon, Aug 8, 2016 at 2:34 AM, 金国栋  wrote:

> I'm using Spark2.0.0 to do sql analysis over parquet files, when using
> `read().parquet("path")`, or `write().parquet("path")` in Java(I followed
> the example java file in source code exactly), I always encountered
>
> *Exception in thread "main" java.lang.RuntimeException: Multiple sources
> found for parquet
> (org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat,
> org.apache.spark.sql.execution.datasources.parquet.DefaultSource), please
> specify the fully qualified class name.*
>
> Any idea why?
>
> Thanks.
>
> Best,
> Jelly
>


Re: 答复: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread ayan guha
Hi

I think you should modify initModel() function to getOrCreateModel() and
create the model as singleton object. You may want to refer this link


On Mon, Aug 8, 2016 at 7:44 PM, 莫涛  wrote:

> Hi Ndjido,
>
> Thanks for your reply.
>
> Yes, it is good idea if the model can be broadcast.
>
> I'm working with a built library (on Linux, say classifier.so and
> classifier.h) and it requires the model file is in the local file system.
> As I don't have access to the library code, I write JNI to wrap the
> classifier.
> The model file can be sent to each executor efficiently by addFile and
> getFile.
> But initModel() is still expensive as it actually loads a local file into
> C++ heap memory, which is not serializable.
>
> That's the reason I can not broadcast the model and I have to avoid load
> model as possible as I can.
>
> Best
>
> --
> *发件人:* ndj...@gmail.com 
> *发送时间:* 2016年8月8日 17:16:27
> *收件人:* 莫涛
> *抄送:* user@spark.apache.org
> *主题:* Re: how to generate a column using mapParition and then add it back
> to the df?
>
>
> Hi MoTao,
> What about broadcasting the model?
>
> Cheers,
> Ndjido.
>
> > On 08 Aug 2016, at 11:00, MoTao  wrote:
> >
> > Hi all,
> >
> > I'm trying to append a column to a df.
> > I understand that the new column must be created by
> > 1) using literals,
> > 2) transforming an existing column in df,
> > or 3) generated from udf over this df
> >
> > In my case, the column to be appended is created by processing each row,
> > like
> >
> > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> > val func = udf {
> >  v: Double => {
> >val model = initModel()
> >model.process(v)
> >  }
> > }
> > val df2 = df.withColumn("valueWithBias", func(col("value")))
> >
> > This works fine. However, for performance reason, I want to avoid
> > initModel() for each row.
> > So I come with mapParitions, like
> >
> > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> > val df2 = df.mapPartitions(rows => {
> >  val model = initModel()
> >  rows.map(row => model.process(row.getAs[Double](0)))
> > })
> > val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
> >
> > But this is wrong as a column of df2 *CANNOT* be appended to df.
> >
> > The only solution I got is to force mapPartitions to return a whole row
> > instead of the new column,
> > ( Something like "row => Row.fromSeq(row.toSeq ++
> > Array(model.process(...)))" )
> > which requires a lot of copy as well.
> >
> > I wonder how to deal with this problem with as few overhead as possible?
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/how-to-generate-a-column-using-
> mapParition-and-then-add-it-back-to-the-df-tp27493.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Best Regards,
Ayan Guha


答复: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread 莫涛
Hi Ndjido,

Thanks for your reply.

Yes, it is good idea if the model can be broadcast.

I'm working with a built library (on Linux, say classifier.so and classifier.h) 
and it requires the model file is in the local file system.
As I don't have access to the library code, I write JNI to wrap the classifier.
The model file can be sent to each executor efficiently by addFile and getFile.
But initModel() is still expensive as it actually loads a local file into C++ 
heap memory, which is not serializable.

That's the reason I can not broadcast the model and I have to avoid load model 
as possible as I can.

Best


发件人: ndj...@gmail.com 
发送时间: 2016年8月8日 17:16:27
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: how to generate a column using mapParition and then add it back to the 
df?


Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao  wrote:
>
> Hi all,
>
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
>
> In my case, the column to be appended is created by processing each row,
> like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf {
>  v: Double => {
>val model = initModel()
>model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
>
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
>
> But this is wrong as a column of df2 *CANNOT* be appended to df.
>
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
>
> I wonder how to deal with this problem with as few overhead as possible?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Multiple Sources Found for Parquet

2016-08-08 Thread 金国栋
I'm using Spark2.0.0 to do sql analysis over parquet files, when using
`read().parquet("path")`, or `write().parquet("path")` in Java(I followed
the example java file in source code exactly), I always encountered

*Exception in thread "main" java.lang.RuntimeException: Multiple sources
found for parquet
(org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat,
org.apache.spark.sql.execution.datasources.parquet.DefaultSource), please
specify the fully qualified class name.*

Any idea why?

Thanks.

Best,
Jelly


Re: why spark 2 shell console still sending warnings despite setting log4j.rootCategory=ERROR, console

2016-08-08 Thread Mich Talebzadeh
Sorted. The new section in log4j.properties has to be modified

# Set the default spark-shell log level to WARN. When running the
spark-shell, the
# log level for this class is used to overwrite the root logger's log
level, so that
# the user can have different defaults for the shell and regular Spark apps.
*log4j.logger.org.apache.spark.repl.Main=ERROR*



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 August 2016 at 09:33, Mich Talebzadeh 
wrote:

> Hi,
>
> Just doing some tests on Spark 2 version spark-shell
>
> using spark-shell, in my $SPARK_HOME/conf/log4j.properties, I have:
>
> log4j.rootCategory=ERROR, console
>
> I get
>
> spark-shell
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel).
>
> I don't have this issue with spark 1.6.1
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread ndjido

Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao  wrote:
> 
> Hi all,
> 
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
> 
> In my case, the column to be appended is created by processing each row,
> like
> 
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf { 
>  v: Double => {
>val model = initModel()
>model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
> 
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
> 
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()  
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
> 
> But this is wrong as a column of df2 *CANNOT* be appended to df.
> 
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
> 
> I wonder how to deal with this problem with as few overhead as possible?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: hdfs persist rollbacks when spark job is killed

2016-08-08 Thread Gourav Sengupta
There is a mv command in GCS but I am not quite sure (because of limitation
of data on which I work on it and lack my budget) whether the mv command
actually copies and deletes or just re-points the files to a new directory
by changing its meta-data.

Yes the Data Quality checks are done after the job has completed
successfully (without quitting). If the Data Quality checks fail within
certain threshold then the data is not deleted, but just generate a
warning. If more than a particular threshold, then the data is deleted and
then a warning is raised.



Regards,
Gourav Sengupta

On Mon, Aug 8, 2016 at 7:51 AM, Chanh Le  wrote:

> Thank you Gourav,
>
> Moving files from _temp folders to main folders is an additional overhead
> when you are working on S3 as there is no move operation.
>
> Good catch. Is that GCS the same?
>
> I generally have a set of Data Quality checks after each job to ascertain
> whether everything went fine, the results are stored so that it can be
> published in a graph for monitoring, thus solving two purposes.
>
>
> So that mean after the job done you query the data to check right?
>
>
>
> On Aug 8, 2016, at 1:46 PM, Gourav Sengupta 
> wrote:
>
> But you have to be careful, that is the default setting. There is a way
> you can overwrite it so that the writing to _temp folder does not take
> place and you write directly to the main folder.
>
> Moving files from _temp folders to main folders is an additional overhead
> when you are working on S3 as there is no move operation.
>
> I generally have a set of Data Quality checks after each job to ascertain
> whether everything went fine, the results are stored so that it can be
> published in a graph for monitoring, thus solving two purposes.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Aug 8, 2016 at 7:41 AM, Chanh Le  wrote:
>
>> It’s *out of the box* in Spark.
>> When you write data into hfs or any storage it only creates a new parquet
>> folder properly if your Spark job was success else only *_temp* folder
>> inside to mark it’s still not success (spark was killed) or nothing inside
>> (Spark job was failed).
>>
>>
>>
>>
>>
>> On Aug 8, 2016, at 1:35 PM, Sumit Khanna  wrote:
>>
>> Hello,
>>
>> the use case is as follows :
>>
>> say I am inserting 200K rows as dataframe.write.formate("parquet") etc
>> etc (like a basic write to hdfs  command), but say due to some reason or
>> rhyme my job got killed, when the run was in the mid of it, meaning lets
>> say I was only able to insert 100K rows when my job got killed.
>>
>> twist is that I might actually be upserting, and even in append only
>> cases, my delta change data that is being inserted / written in this run
>> might actually be spanning across various partitions.
>>
>> Now what I am looking for is something to role the changes back, like the
>> batch insertion should be all or nothing, and even if it is partition, it
>> must must be atomic to each row/ unit of insertion.
>>
>> Kindly help.
>>
>> Thanks,
>> Sumit
>>
>>
>>
>
>


how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread MoTao
Hi all,

I'm trying to append a column to a df.
I understand that the new column must be created by
1) using literals,
2) transforming an existing column in df,
or 3) generated from udf over this df

In my case, the column to be appended is created by processing each row,
like

val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
val func = udf { 
  v: Double => {
val model = initModel()
model.process(v)
  }
}
val df2 = df.withColumn("valueWithBias", func(col("value")))

This works fine. However, for performance reason, I want to avoid
initModel() for each row.
So I come with mapParitions, like

val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
val df2 = df.mapPartitions(rows => {
  val model = initModel()  
  rows.map(row => model.process(row.getAs[Double](0)))
})
val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL

But this is wrong as a column of df2 *CANNOT* be appended to df.

The only solution I got is to force mapPartitions to return a whole row
instead of the new column,
( Something like "row => Row.fromSeq(row.toSeq ++
Array(model.process(...)))" )
which requires a lot of copy as well.

I wonder how to deal with this problem with as few overhead as possible?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



why spark 2 shell console still sending warnings despite setting log4j.rootCategory=ERROR, console

2016-08-08 Thread Mich Talebzadeh
Hi,

Just doing some tests on Spark 2 version spark-shell

using spark-shell, in my $SPARK_HOME/conf/log4j.properties, I have:

log4j.rootCategory=ERROR, console

I get

spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).

I don't have this issue with spark 1.6.1

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Spark driver memory keeps growing

2016-08-08 Thread Pierre Villard
Hi,

I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast
variables access. The thing is I am not sure to understand why the
broadcast keeps growing and why it does at this place of code.

Basically, I have a large input file, each line having a key. I group by
key my lines to have one object with all data related to a given key. Then
I am doing a map to iterate over my objects, and for each object, I iterate
over a collection which is a broadcast variable. The exception is thrown
when iterating on the broadcast variable.

Here is a quick example:

Input file :

key1,row1
key2,row1
key1,row2
key2,row2

Broadcast variable is a list: List(1,2,3)

I group by key my input :

key1, (row1,row2)
key2, (row1,row2)

Then I do a map

myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object,
item) )

I know the groupByKey is a very costly operation but I am not sure I can
avoid it since the 'executeFunction' needs to have all the lines for a
given key to be executed. Besides the stage where the groupByKey is
performed is successfully completed when the exception is thrown.

Here is an extract from the logs:

16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:18:37 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 2.3 GB so far) 16/08/04 03:19:22 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 2.4 GB so far)
16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:20:53 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 2.6 GB so far) 16/08/04 03:21:11 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 2.7 GB so far)
16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:44:22 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 3.4 GB so far) 16/08/04 03:53:03 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 3.8 GB so far)
16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 4.1 GB so far) 16/08/04 04:20:52 ERROR
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/08/04
04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID
1109) java.lang.OutOfMemoryError: Java heap space at
java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at
java.util.IdentityHashMap.put(IdentityHashMap.java:445) at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

...

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727) at
org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30)

Any suggestion regarding what could explain this behavior?

Thanks!


Re: mapWithState handle timeout

2016-08-08 Thread jackerli
i get answer from stack overflow: 
http://stackoverflow.com/questions/38397688/spark-mapwithstate-api-explanation

  

when key is timeout, the new value is None; in my case, i set value when
timeout occurs



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapWithState-handle-timeout-tp27422p27492.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What are the configurations needs to connect spark and ms-sql server?

2016-08-08 Thread Deepak Sharma
Hi Devi
Please make sure the jdbc jar is in the spark classpath.
With spark-submit , you can use --jars option to specify the sql server
jdbc jar.

Thanks
Deepak

On Mon, Aug 8, 2016 at 1:14 PM, Devi P.V  wrote:

> Hi all,
>
> I am trying to write a spark dataframe into MS-Sql Server.I have tried
> using the following code,
>
>  val sqlprop = new java.util.Properties
> sqlprop.setProperty("user","uname")
> sqlprop.setProperty("password","pwd")
> sqlprop.setProperty("driver","com.microsoft.sqlserver.jdbc.
> SQLServerDriver")
> val url = "jdbc:sqlserver://samplesql.amazonaws.com:1433/dbName"
> val dfWriter = df.write
> dfWriter.jdbc(url, "tableName", sqlprop)
>
> But I got following error
>
> Exception in thread "main" java.lang.ClassNotFoundException:
> com.microsoft.sqlserver.jdbc.SQLServerDriver
>
> what are the configurations needs to connect to MS-Sql Server.Not found
> any library dependencies for connecting spark and MS-Sql.
>
> Thanks
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non
serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar  wrote:

> Hello Hao Ren,
>
> Doesn't the code...
>
> val add = udf {
>   (a: Int) => a + notSer.value
> }
> Mean UDF function that Int => Int ?
>
> Thanks,
> Muthu
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:
>
>> I am playing with spark 2.0
>> What I tried to test is:
>>
>> Create a UDF in which there is a non serializable object.
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>> It depends also which "action" is called on that dataframe.
>>
>> Here is the code for reproducing the pb:
>>
>> 
>> object DataFrameSerDeTest extends App {
>>
>>   class A(val value: Int) // It is not serializable
>>
>>   def run() = {
>> val spark = SparkSession
>>   .builder()
>>   .appName("DataFrameSerDeTest")
>>   .master("local[*]")
>>   .getOrCreate()
>>
>> import org.apache.spark.sql.functions.udf
>> import spark.sqlContext.implicits._
>>
>> val notSer = new A(2)
>> val add = udf {
>>   (a: Int) => a + notSer.value
>> }
>> val df = spark.createDataFrame(Seq(
>>   (1, 2),
>>   (2, 2),
>>   (3, 2),
>>   (4, 2)
>> )).toDF("key", "value")
>>   .select($"key", add($"value").as("added"))
>>
>> df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>> df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>   }
>>
>>   run()
>> }
>> 
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>> Any ideas ? Or I just messed things up ?
>> Any help is highly appreciated.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


What are the configurations needs to connect spark and ms-sql server?

2016-08-08 Thread Devi P.V
Hi all,

I am trying to write a spark dataframe into MS-Sql Server.I have tried
using the following code,

 val sqlprop = new java.util.Properties
sqlprop.setProperty("user","uname")
sqlprop.setProperty("password","pwd")

sqlprop.setProperty("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
val url = "jdbc:sqlserver://samplesql.amazonaws.com:1433/dbName"
val dfWriter = df.write
dfWriter.jdbc(url, "tableName", sqlprop)

But I got following error

Exception in thread "main" java.lang.ClassNotFoundException:
com.microsoft.sqlserver.jdbc.SQLServerDriver

what are the configurations needs to connect to MS-Sql Server.Not found any
library dependencies for connecting spark and MS-Sql.

Thanks


Re: Is Spark right for my use case?

2016-08-08 Thread Deepak Sharma
Hi Danellis
For point 1 , spark streaming is something to look at.
For point 2 , you can create DAO from cassandra on each stream
processing.This may be costly operation though , but to do real time
processing of data , you have to live with t.
Point 3 is covered in point 2 above.
Since you are starting fresh , i would suggest going with 2.0 as they have
many features such as dataset /structured querying of streams etc over
previous releases.
Thanks
Deepak

On Mon, Aug 8, 2016 at 11:52 AM, danellis  wrote:

> Spark n00b here.
>
> Working with online retailers, I start with a list of their products in
> Cassandra (with prices, stock levels, descriptions, etc) and then receive
> an
> HTTP request every time one of them changes. For each change, I update the
> product in Cassandra and store the change with the old and new values.
>
> What I'd like to do is provide a dashboard with various metrics. Some of
> them are trivial, such as "last n changes". Others, like number of
> in-stock/out-of-stock products would be more complex to retrieve from
> Cassandra, because they're an aggregate of the whole product set.
>
> I'm thinking about streaming the changes into Spark (via RabbitMQ) to
> generate the data needed for the aggregate metrics, and either storing the
> results in Cassandra or publishing them back to RabbitMQ (depending on
> whether I have the dashboard poll or use a WebSocket).
>
> I have a few questions:
>
> 1) Does this seem like a good use case for Spark?
>
> 2) How much work is it appropriate for a transformation to do? For example,
> my API service currently checks the update against the current data and
> only
> publishes a change if they differ. That sounds to me like it could be a
> filter operation on a stream of all the updates, but it would require
> accessing data from Cassandra inside the filter transformation. Is that
> okay, or something to be avoided? The changes that make it through the
> filter would also have to be logged in Cassandra. Is that crossing concerns
> too much?
>
> 3) If I'm starting out with existing data, how do I take that into account
> when starting to do stream processing? Would I write something to take my
> logged changes from Cassandra and publish them to RabbitMQ before I start
> my
> real streaming? Seems like the switch-over might be tricky. (Note: I don't
> necessarily need to do this, depending on how things go.)
>
> 4) Is it a good idea to start with 2.0 now? I see there's an AMQP module
> with 2.0 support and the Cassandra one supports 2.0 with a little work.
>
> Thanks for any feedback.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-Spark-right-for-my-use-case-tp27491.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: [Spark1.6] Or (||) operator not working in DataFrame

2016-08-08 Thread Mich Talebzadeh
I am afraid logic is incorrect. that is the reason why it is not working.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 8 August 2016 at 04:36, Divya Gehlot  wrote:

> I tried with condition expression  also but it didn't work :(
>
> On Aug 8, 2016 11:13 AM, "Chanh Le"  wrote:
>
>> You should use *df.where(conditionExpr)* which is more convenient to
>> express some simple term in SQL.
>>
>>
>> /**
>>  * Filters rows using the given SQL expression.
>>  * {{{
>>  *   peopleDf.where("age > 15")
>>  * }}}
>>  * @group dfops
>>  * @since 1.5.0
>>  */
>> def where(conditionExpr: String): DataFrame = {
>>   filter(Column(SqlParser.parseExpression(conditionExpr)))
>> }
>>
>>
>>
>>
>>
>> On Aug 7, 2016, at 10:58 PM, Mich Talebzadeh 
>> wrote:
>>
>> although the logic should be col1 <> a && col(1) <> b
>>
>> to exclude both
>>
>> Like
>>
>> df.filter('transactiontype > " ").filter(not('transactiontype ==="DEB")
>> && not('transactiontype ==="BGC")).select('transaction
>> type).distinct.collect.foreach(println)
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 7 August 2016 at 16:53, Mich Talebzadeh 
>> wrote:
>>
>>> try similar to this
>>>
>>> df.filter(not('transactiontype ==="DEB") || not('transactiontype
>>> ==="CRE"))
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 7 August 2016 at 15:43, Divya Gehlot  wrote:
>>>
 Hi,
 I have use case where I need to use or[||] operator in filter condition.
 It seems its not working its taking the condition before the operator
 and ignoring the other filter condition after or operator.
 As any body faced similar issue .

 Psuedo code :
 df.filter(col("colName").notEqual("no_value") ||
 col("colName").notEqual(""))

 Am I missing something.
 Would really appreciate the help.


 Thanks,
 Divya

>>>
>>>
>>
>>


Re: Spark 2.0.0 - Broadcast variable - What is ClassTag?

2016-08-08 Thread Holden Karau
If your using this from Java you might find it easier to construct a
JavaSparkContext, the broadcast function will automatically use a fake
class tag.

On Sun, Aug 7, 2016 at 11:57 PM, Aseem Bansal  wrote:

> I am using the following to broadcast and it explicitly requires classtag
>
> sparkSession.sparkContext().broadcast
>
> On Mon, Aug 8, 2016 at 12:09 PM, Holden Karau 
> wrote:
>
>> Classtag is Scala concept (see http://docs.scala-lang.or
>> g/overviews/reflection/typetags-manifests.html) - although this should
>> not be explicitly required - looking at http://spark.apache.org/doc
>> s/latest/api/scala/index.html#org.apache.spark.SparkContext we can see
>> that in Scala the classtag tag is implicit and if your calling from Java
>> http://spark.apache.org/docs/latest/api/scala/index.htm
>> l#org.apache.spark.api.java.JavaSparkContext the classtag doesn't need
>> to be specified (instead it uses a "fake" class tag automatically for you).
>> Where are you seeing the different API?
>>
>> On Sun, Aug 7, 2016 at 11:32 PM, Aseem Bansal 
>> wrote:
>>
>>> Earlier for broadcasting we just needed to use
>>>
>>> sparkcontext.broadcast(objectToBroadcast)
>>>
>>> But now it is
>>>
>>> sparkcontext.broadcast(objectToBroadcast, classTag)
>>>
>>> What is classTag here?
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau