Re: Re: spark sql data skew

2018-07-20 Thread Xiaomeng Wan
try divide and conquer, create a column x for the fist character of userid,
and group by company+x. if still too large, try first two character.

On 17 July 2018 at 02:25, 崔苗  wrote:

> 30G user data, how to get distinct users count after creating a composite
> key based on company and userid?
>
>
> 在 2018-07-13 18:24:52,Jean Georges Perrin  写道:
>
> Just thinking out loud… repartition by key? create a composite key based
> on company and userid?
>
> How big is your dataset?
>
> On Jul 13, 2018, at 06:20, 崔苗  wrote:
>
> Hi,
> when I want to count(distinct userId) by company,I met the data skew and
> the task takes too long time,how to count distinct by keys on skew data in
> spark sql ?
>
> thanks for any reply
>
>
>
>


Re: IDE for python

2017-06-28 Thread Xiaomeng Wan
Thanks for all of you. I will give Pycharm a try.

Regards,
Shawn

On 28 June 2017 at 06:07, Sotola, Radim <radim.sot...@teradata.com> wrote:

> I know. But I pay around 20Euro per month for all products from JetBrains
> and I think this is not so much – I Czech it is one evening in pub.
>
>
>
> *From:* Md. Rezaul Karim [mailto:rezaul.ka...@insight-centre.org]
> *Sent:* Wednesday, June 28, 2017 12:55 PM
> *To:* Sotola, Radim <radim.sot...@teradata.com>
> *Cc:* spark users <user@spark.apache.org>; ayan guha <guha.a...@gmail.com>;
> Abhinay Mehta <abhinay.me...@gmail.com>; Xiaomeng Wan <shawn...@gmail.com>
> *Subject:* RE: IDE for python
>
>
>
> By the way, Pycharm from JetBrians also have a community edition which is
> free and open source.
>
>
>
> Moreover, if you are a student, you can use the professional edition for
> students as well.
>
>
>
> For more, see here https://www.jetbrains.com/student/
>
>
>
> On Jun 28, 2017 11:18 AM, "Sotola, Radim" <radim.sot...@teradata.com>
> wrote:
>
> Pycharm is good choice. I buy monthly subscription and can see that the
> PyCharm development continue  (I mean that this is not tool which somebody
> develop and leave it without any upgrades).
>
>
>
> *From:* Abhinay Mehta [mailto:abhinay.me...@gmail.com]
> *Sent:* Wednesday, June 28, 2017 11:06 AM
> *To:* ayan guha <guha.a...@gmail.com>
> *Cc:* User <user@spark.apache.org>; Xiaomeng Wan <shawn...@gmail.com>
> *Subject:* Re: IDE for python
>
>
>
> I use Pycharm and it works a treat. The big advantage I find is that I can
> use the same command shortcuts that I do when developing with IntelliJ IDEA
> when doing Scala or Java.
>
>
>
>
>
> On 27 June 2017 at 23:29, ayan guha <guha.a...@gmail.com> wrote:
>
> Depends on the need. For data exploration, i use notebooks whenever I can.
> For developement, any good text editor should work, I use sublime. If you
> want auto completion and all, you can use eclipse or pycharm, I do not :)
>
>
>
> On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan <shawn...@gmail.com> wrote:
>
> Hi,
>
> I recently switched from scala to python, and wondered which IDE people
> are using for python. I heard about pycharm, spyder etc. How do they
> compare with each other?
>
>
>
> Thanks,
>
> Shawn
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>
>


IDE for python

2017-06-27 Thread Xiaomeng Wan
Hi,
I recently switched from scala to python, and wondered which IDE people are
using for python. I heard about pycharm, spyder etc. How do they compare
with each other?

Thanks,
Shawn


Re: Un-exploding / denormalizing Spark SQL help

2017-02-08 Thread Xiaomeng Wan
You could also try pivot.

On 7 February 2017 at 16:13, Everett Anderson 
wrote:

>
>
> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust 
> wrote:
>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> 
>> .
>>
>
> Very cool! Using the simpler aggregates feels cleaner.
>
>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski  wrote:
>>
>>> Hi Everett,
>>>
>>> That's pretty much what I'd do. Can't think of a way to beat your
>>> solution. Why do you "feel vaguely uneasy about it"?
>>>
>>
> Maybe it felt like I was unnecessarily grouping-by twice, but probably
> mostly that I hadn't used pivot before.
>
> Interestingly, the physical plans are not especially different between
> these two solutions after the rank column is added. They both have two
> SortAggregates that seem to be figuring out where to put results based on
> the rank:
>
> My original one:
>
> == Physical Plan ==
> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
> +- SortAggregate(key=[id#279,name#280], functions=[first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
> else null, true)])
>+- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
> temp_struct#312 else null, true)])
>   +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
> priority#283) AS temp_struct#312]
>  +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
> +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>+- Exchange hashpartitioning(id#279, name#280, 200)
>   +- Scan ExistingRDD[id#279,name#280,
> extra#281,data#282,priority#283]
>
>
> And modifying Michael's slightly to use a rank:
>
> import org.apache.spark.sql.functions._
>
> def getColumnWithRank(column: String, rank: Int) = {
>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
> ignoreNulls = true)
> }
>
> val withRankColumn = data.withColumn("rank", 
> functions.dense_rank().over(Window.partitionBy("id",
> "name").orderBy("priority")))
>
> val modCollapsed = withRankColumn
>   .groupBy($"id", $"name")
>   .agg(
> getColumnWithRank("data", 1) as 'data1,
> getColumnWithRank("data", 2) as 'data2,
> getColumnWithRank("data", 3) as 'data3,
> getColumnWithRank("extra", 1) as 'extra1,
> getColumnWithRank("extra", 2) as 'extra2,
> getColumnWithRank("extra", 3) as 'extra3)
>
>
> modCollapsed.explain
>
> == Physical Plan ==
> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965
> = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
> ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
> null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
> END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
> true)])
> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE
> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>+- *Project [id#279, name#280, extra#281, data#282, rank#965]
>   +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>  +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
> +- Exchange hashpartitioning(id#279, name#280, 200)
>+- Scan 

Re: How to save spark-ML model in Java?

2017-01-19 Thread Xiaomeng Wan
cv.fit is going to give you a CrossValidatorModel, if you want to extract
the real model built. You need to do

val cvModel = cv.fit(data)

val plmodel = cvModel.bestModel.asInstanceOf[PipelineModel]

val model = plmodel.stages(2).asInstanceOf[whatever_model]

then you can model.save

On 19 January 2017 at 11:31, Minudika Malshan  wrote:

> Hi,
>
> Thanks Rezaul and Asher Krim.
>
> The method suggested by Rezaul works fine for NaiveBayes but still fails
> for RandomForest and Multi-layer perceptron classifier.
> Everything properly is saved until this stage.
>
> CrossValidator cv = new CrossValidator()
> .setEstimator(pipeline)
> .setEvaluator(evaluator)
> .setEstimatorParamMaps(paramGrid)
> .setNumFolds(folds);
>
> Any idea on how to resolve this?
>
>
>
>
>
> On Thu, Jan 12, 2017 at 9:13 PM, Asher Krim  wrote:
>
>> What version of Spark are you on?
>> Although it's cut off, I think your error is with RandomForestClassifier,
>> is that correct? If so, you should upgrade to spark 2 since I think this
>> class only became writeable/readable in Spark 2 (
>> https://github.com/apache/spark/pull/12118)
>>
>> On Thu, Jan 12, 2017 at 8:43 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi Malshan,
>>>
>>> The error says that one (or more) of the estimators/stages is either not
>>> writable or compatible that supports overwrite/model write operation.
>>>
>>> Suppose you want to configure an ML pipeline consisting of three stages
>>> (i.e. estimator): tokenizer, hashingTF, and nb:
>>> val nb = new NaiveBayes().setSmoothing(0.1)
>>> val tokenizer = new Tokenizer().setInputCol("label
>>> ").setOutputCol("label")
>>> val hashingTF = new HashingTF().setInputCol(tokeni
>>> zer.getOutputCol).setOutputCol("features")
>>> val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF,
>>> nb))
>>>
>>>
>>> Now check if all the stages are writable. And to make it ease try saving
>>> stages individually:  -e.g. tokenizer.write.save("path")
>>>
>>>
>>> hashingTF.write.save("path")
>>> After that suppose you want to perform a 10-fold cross-validation as
>>> follows:
>>> val cv = new CrossValidator()
>>>   .setEstimator(pipeline)
>>>   .setEvaluator(new BinaryClassificationEvaluator)
>>>   .setEstimatorParamMaps(paramGrid)
>>>   .setNumFolds(10)
>>>
>>> Where:
>>> val paramGrid = new ParamGridBuilder()
>>> .addGrid(hashingTF.numFeatures, Array(10,
>>> 100, 1000))
>>> .addGrid(nb.smoothing, Array(0.001, 0.0001))
>>> .build()
>>>
>>> Now the model that you trained using the training set should be writable
>>> if all of the stages are okay:
>>> val model = cv.fit(trainingData)
>>> model.write.overwrite().save("output/NBModel")
>>>
>>>
>>>
>>> Hope that helps.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> 
>>>
>>> On 12 January 2017 at 09:09, Minudika Malshan 
>>> wrote:
>>>
 Hi,

 When I try to save a pipeline model using spark ML (Java) , the
 following exception is thrown.


 java.lang.UnsupportedOperationException: Pipeline write will fail on
 this Pipeline because it contains a stage which does not implement
 Writable. Non-Writable stage: rfc_98f8c9e0bd04 of type class
 org.apache.spark.ml.classification.Rand


 Here is my code segment.


 model.write().overwrite,save


 model.write().overwrite().save("path
 model.write().overwrite().save("mypath");


 How to resolve this?

 Thanks and regards!

 Minudika


>>>
>>
>>
>> --
>> Asher Krim
>> Senior Software Engineer
>>
>
>
>
> --
> *Minudika Malshan*
> Undergraduate
> Department of Computer Science and Engineering
> University of Moratuwa
> Sri Lanka.
> 
>
>
>


Re: filter rows by all columns

2017-01-17 Thread Xiaomeng Wan
Thank you Hyukjin,
It works. This is what I end up doing

df.filter(_.toSeq.zipWithIndex.forall(v => v._1.toString().toDouble -
means(v._2) <= 3*staddevs(v._2))).show()


Regards,

Shawn

On 16 January 2017 at 18:30, Hyukjin Kwon  wrote:

> Hi Shawn,
>
> Could we do this as below?
>
>  for any of true
>
> scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
> df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]
>
> scala> df.filter(_.toSeq.exists(v => v == 1)).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|0.5|
> |  2|1.0|
> +---+---+
>
> ​
>
> or for all of true
>
> scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
> df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]
>
> scala> df.filter(_.toSeq.forall(v => v == 0)).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  0|0.0|
> +---+---+
>
> ​
>
>
>
>
>
> 2017-01-17 7:27 GMT+09:00 Shawn Wan :
>
>> I need to filter out outliers from a dataframe by all columns. I can
>> manually list all columns like:
>>
>> df.filter(x=>math.abs(x.get(0).toString().toDouble-means(0))<=3*stddevs(0
>> ))
>>
>> .filter(x=>math.abs(x.get(1).toString().toDouble-means(1))<=3*stddevs
>> (1))
>>
>> ...
>>
>> But I want to turn it into a general function which can handle variable
>> number of columns. How could I do that? Thanks in advance!
>>
>>
>> Regards,
>>
>> Shawn
>>
>> --
>> View this message in context: filter rows by all columns
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


filter rows based on all columns

2017-01-13 Thread Xiaomeng Wan
I need to filter out outliers from a dataframe on all columns. I can
manually list all columns like:

df.filter(x=>math.abs(x.get(0).toString().toDouble-means(0))<=3*stddevs(0))

.filter(x=>math.abs(x.get(1).toString().toDouble-means(1))<=3*stddevs(1
))

...

But I want to turn it into a general function which can handle variable
number of columns. How could I do that? Thanks in advance!


Regards,

Shawn


spark linear regression error training dataset is empty

2016-12-21 Thread Xiaomeng Wan
Hi,

I am running linear regression on a dataframe and get the following error:

Exception in thread "main" java.lang.AssertionError: assertion failed:
Training dataset is empty.

at scala.Predef$.assert(Predef.scala:170)

at
org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(WeightedLeastSquares.scala:247)

at
org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:82)

at
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:180)

at
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:70)

at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)

here is the data and code:

{"label":79.3,"features":{"type":1,"values":[6412.14350001,888.0,1407.0,1.5844594594594594,10.614,12.07,0.12062966031483012,0.9991237664152219,6.065,0.49751449875724935]}}

{"label":72.3,"features":{"type":1,"values":[6306.04450001,1084.0,1451.0,1.338560885608856,7.018,12.04,0.41710963455149497,0.9992054343916128,6.05,0.4975083056478405]}}

{"label":76.7,"features":{"type":1,"values":[6142.9203,1494.0,1437.0,0.9618473895582329,7.939,12.06,0.34170812603648426,0.9992216101762574,6.06,0.49751243781094534]}}

val lr = new LinearRegression().setMaxIter(300).setFeaturesCol("features")

val lrModel = lr.fit(assembleddata)

Any clue or inputs are appreciated.


Regards,

Shawn


build models in parallel

2016-11-29 Thread Xiaomeng Wan
I want to divide big data into groups (eg groupby some id), and build one
model for each group. I am wondering whether I can parallelize the model
building process by implementing a UDAF (eg running linearregression in its
evaluate mothod). is it good practice? anybody has experience? Thanks!

Regards,
Shawn


Re: how to see Pipeline model information

2016-11-24 Thread Xiaomeng Wan
here is the scala code I use to get the best model, I never used java

val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(new
 RegressionEvaluator).setEstimatorParamMaps(paramGrid)

val cvModel = cv.fit(data)

val plmodel = cvModel.bestModel.asInstanceOf[PipelineModel]

val lrModel = plmodel.stages(0).asInstanceOf[LinearRegressionModel]

On 24 November 2016 at 10:23, Zhiliang Zhu <zchl.j...@yahoo.com> wrote:

> Hi Xiaomeng,
>
> Thanks very much for your comment, which is helpful for me.
>
> However, it seems that here met more issue about XXXClassifier and
> XXXClassificationModel,
> as the codes below:
>
> ...
> GBTClassifier gbtModel = new GBTClassifier();
> ParamMap[] grid = new ParamGridBuilder()
> .addGrid(gbtModel.maxIter(), new int[] {5})
> .addGrid(gbtModel.maxDepth(), new int[] {5})
> .build();
>
> CrossValidator crossValidator = new CrossValidator()
> .setEstimator(gbtModel) //rfModel
> .setEstimatorParamMaps(grid)
> .setEvaluator(new BinaryClassificationEvaluator())
> .setNumFolds(6);
>
> Pipeline pipeline = new Pipeline()
> .setStages(new PipelineStage[] {labelIndexer, vectorSlicer,
> crossValidator});
>
> PipelineModel plModel = pipeline.fit(data);
> ArrayList m = new ArrayList ();
> m.add(plModel);
> JAVA_SPARK_CONTEXT.parallelize(m, 1).saveAsObjectFile(this.outputPath
> + POST_MODEL_PATH);
>
> Transformer[] stages = plModel.stages();
> Transformer cvStage = stages[2];
> CrossValidator crossV = new 
> TR2CVConversion(cvStage).getInstanceOfCrossValidator();
> //call self defined scala class
> Estimator estimator = crossV.getEstimator();
>
> GBTClassifier gbt = (GBTClassifier)estimator;
>
> //all the above is okay to compile, but it is wrong to compile for next
> line
> //however, in GBTClassifier seems not much detailed model description to
> get
> //but by GBTClassificationModel.toString(), we may get the specific trees
> which are just I want
>
> GBTClassificationModel model = (GBTClassificationModel)get;  //wrong
> to compile
>
>
> Then how to get the specific trees or forest from the model?
> Thanks in advance~
>
> Zhiliang
>
>
>
>
>
>
>
>
>
>
> On Thursday, November 24, 2016 2:15 AM, Xiaomeng Wan <shawn...@gmail.com>
> wrote:
>
>
> You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The
> number (0 in example) for stages depends on the order you call setStages.
>
> Shawn
>
> On 23 November 2016 at 10:21, Zhiliang Zhu <zchl.j...@yahoo.com.invalid>
> wrote:
>
>
> Dear All,
>
> I am building model by spark pipeline, and in the pipeline I used Random
> Forest Alg as its stage.
> If I just use Random Forest but not make it by way of pipeline, I could
> see the information about the forest by API as
> rfModel.toDebugString() and rfModel.toString() .
>
> However, while it comes to pipeline, how to check the alg information,
> such as the tree, or the threshold selected by lr etc ...
>
> Thanks in advance~~
>
> zhiliang
>
>
> -- -- -
> To unsubscribe e-mail: user-unsubscribe@spark.apache. org
> <user-unsubscr...@spark.apache.org>
>
>
>
>
>


Re: how to see Pipeline model information

2016-11-23 Thread Xiaomeng Wan
You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The
number (0 in example) for stages depends on the order you call setStages.

Shawn

On 23 November 2016 at 10:21, Zhiliang Zhu 
wrote:

>
> Dear All,
>
> I am building model by spark pipeline, and in the pipeline I used Random
> Forest Alg as its stage.
> If I just use Random Forest but not make it by way of pipeline, I could
> see the information about the forest by API as
> rfModel.toDebugString() and rfModel.toString() .
>
> However, while it comes to pipeline, how to check the alg information,
> such as the tree, or the threshold selected by lr etc ...
>
> Thanks in advance~~
>
> zhiliang
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread Xiaomeng Wan
You can partitioned on the first n letters of userid

On 17 November 2016 at 08:25, titli batali  wrote:

> Hi,
>
> I have a use case, where we have 1000 csv files with a column user_Id,
> having 8 million unique users. The data contains: userid,date,transaction,
> where we run some queries.
>
> We have a case where we need to iterate for each transaction in a
> particular date for each user. There is three nesting loops
>
> for(user){
>   for(date){
> for(transactions){
>   //Do Something
>   }
>}
> }
>
> i.e we do similar thing for every (date,transaction) tuple for a
> particular user. In order to get away with loop structure and decrease the
> processing time We are converting converting the csv files to parquet and
> partioning it with userid, df.write.format("parquet").
> partitionBy("useridcol").save("hdfs://path").
>
> So that while reading the parquet files, we read a particular user in a
> particular partition and create a Cartesian product of (date X transaction)
> and work on the tuple in each partition, to achieve the above level of
> nesting. Partitioning on 8 million users is it a bad option. What could be
> a better way to achieve this?
>
> Thanks
>
>
>


load large number of files from s3

2016-11-11 Thread Xiaomeng Wan
Hi,
We have 30 million small files (100k each) on s3. I want to know how bad it
is to load them directly from s3 ( eg driver memory, io, executor memory,
s3 reliability) before merge or distcp them. Anybody has experience? Thanks
in advance!

Regards,
Shawn


read large number of files on s3

2016-11-08 Thread Xiaomeng Wan
Hi,
We have 30 million small (100k each) files on s3 to process. I am thinking
about something like below to load them in parallel

val data = sc.union(sc.wholeTextFiles("s3a://.../*.json").map(...)
.toDF().createOrReplaceTempView("data")

How to estimate the driver memory it should be given? is there better
practice? or should I merge them in preprocess? Thanks in advance.

Regards,
Shawn