Re: Will be in around 12:30pm due to some personal stuff

2017-01-19 Thread Gavin Yue
PST or est ? 

> On Jan 19, 2017, at 21:55, ayan guha  wrote:
> 
> Sure...we will wait :) :)
> 
> Just kidding
> 
>> On Fri, Jan 20, 2017 at 4:48 PM, Manohar753 
>>  wrote:
>> Get Outlook for Android
>> Happiest Minds Disclaimer
>> This message is for the sole use of the intended recipient(s) and may 
>> contain confidential, proprietary or legally privileged information. Any 
>> unauthorized review, use, disclosure or distribution is prohibited. If you 
>> are not the original intended recipient of the message, please contact the 
>> sender by reply email and destroy all copies of the original message.
>> 
>> Happiest Minds Technologies 
>> 
>> 
>> View this message in context: Will be in around 12:30pm due to some personal 
>> stuff
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


Re: Deep learning libraries for scala

2016-09-30 Thread Gavin Yue
Skymind you could try. It is java 

I never test though. 

> On Sep 30, 2016, at 7:30 PM, janardhan shetty  wrote:
> 
> Hi,
> 
> Are there any good libraries which can be used for scala deep learning models 
> ?
> How can we integrate tensorflow with scala ML ?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Re[6]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-03 Thread Gavin Yue
Any shuffling? 


> On Sep 3, 2016, at 5:50 AM, Сергей Романов  wrote:
> 
> Same problem happens with CSV data file, so it's not parquet-related either.
> 
> 
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
> 
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
> >>> import timeit
> >>> from pyspark.sql.types import *
> >>> schema = StructType([StructField('dd_convs', FloatType(), True)])
> >>> for x in range(50, 70): print x, 
> >>> timeit.timeit(spark.read.csv('file:///data/dump/test_csv', 
> >>> schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1)
> 50 0.372850894928
> 51 0.376906871796
> 52 0.381325960159
> 53 0.385444164276
> 54 0.38685192
> 55 0.388918161392
> 56 0.397624969482
> 57 0.391713142395
> 58 2.62714004517
> 59 2.68421196938
> 60 2.74627685547
> 61 2.81081581116
> 62 3.43532109261
> 63 3.07742786407
> 64 3.03904604912
> 65 3.01616096497
> 66 3.06293702126
> 67 3.09386610985
> 68 3.27610206604
> 69 3.2041969299
> 
> Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов 
> :
> 
> Hi,
> 
> I had narrowed down my problem to a very simple case. I'm sending 27kb 
> parquet in attachment. (file:///data/dump/test2 in example)
> 
> Please, can you take a look at it? Why there is performance drop after 57 sum 
> columns?
> 
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
> 
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
> >>> import timeit
> >>> for x in range(70): print x, 
> >>> timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs']
> >>>  * x) ).collect, number=1)
> ... 
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> 0 1.05591607094
> 1 0.200426101685
> 2 0.203800916672
> 3 0.176458120346
> 4 0.184863805771
> 5 0.232321023941
> 6 0.216032981873
> 7 0.201778173447
> 8 0.292424917221
> 9 0.228524923325
> 10 0.190534114838
> 11 0.197028160095
> 12 0.270443916321
> 13 0.429781913757
> 14 0.270851135254
> 15 0.776989936829
> 16 0.27879181
> 17 0.227638959885
> 18 0.212944030762
> 19 0.2144780159
> 20 0.22200012207
> 21 0.262261152267
> 22 0.254227876663
> 23 0.275084018707
> 24 0.292124032974
> 25 0.280488014221
> 16/09/03 15:31:28 WARN Utils: Truncated the string representation of a plan 
> since it was too large. This behavior can be adjusted by setting 
> 'spark.debug.maxToStringFields' in SparkEnv.conf.
> 26 0.290093898773
> 27 0.238478899002
> 28 0.246420860291
> 29 0.241401195526
> 30 0.255286931992
> 31 0.42702794075
> 32 0.327946186066
> 33 0.434395074844
> 34 0.314198970795
> 35 0.34576010704
> 36 0.278323888779
> 37 0.289474964142
> 38 0.290827989578
> 39 0.376291036606
> 40 0.347742080688
> 41 0.363158941269
> 42 0.318687915802
> 43 0.376327991486
> 44 0.374994039536
> 45 0.362971067429
> 46 0.425967931747
> 47 0.370860099792
> 48 0.443903923035
> 49 0.374128103256
> 50 0.378985881805
> 51 0.476850986481
> 52 0.451028823853
> 53 0.432540893555
> 54 0.514838933945
> 55 0.53990483284
> 56 0.449142932892
> 57 0.465240001678 // 5x slower after 57 columns
> 58 2.40412116051
> 59 2.41632795334
> 60 2.41812801361
> 61 2.55726218224
> 62 2.55484509468
> 63 2.56128406525
> 64 2.54642391205
> 65 2.56381797791
> 66 2.56871509552
> 67 2.66187620163
> 68 2.63496208191
> 69 2.8154599
> 
> 
> 
> Sergei Romanov
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> Sergei Romanov
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: EMR for spark job - instance type suggestion

2016-08-26 Thread Gavin Yue
I tried both M4 and R3.  R3 is slightly more expensive, but has larger
memory.

If you doing a lot of in-memory staff, like Join.   I recommend R3.

Otherwise M4 is fine.  Also I remember M4 is EBS instance, so you have to
pay for additional EBS cost as well.



On Fri, Aug 26, 2016 at 10:29 AM, Saurabh Malviya (samalviy) <
samal...@cisco.com> wrote:

> We are going to use EMR cluster for spark jobs in aws. Any suggestion for
> instance type to be used.
>
>
>
> M3.xlarge or r3.xlarge.
>
>
>
> Details:
>
> 1)  We are going to run couple of streaming jobs so we need on demand
> instance type.
>
> 2)  There is no data on hdfs/s3 all data pull from kafka or elastic
> search
>
>
>
>
>
> -Saurabh
>


How to output RDD to one file with specific name?

2016-08-25 Thread Gavin Yue
I am trying to output RDD to disk by

rdd.coleasce(1).saveAsTextFile("/foo")

It outputs to foo folder with a file with name: Part-0.

Is there a way I could directly save the file as /foo/somename ?

Thanks.


Re: Java Recipes for Spark

2016-07-29 Thread Gavin Yue
This is useful:) 

Thank you for sharing. 



> On Jul 29, 2016, at 1:30 PM, Jean Georges Perrin  wrote:
> 
> Sorry if this looks like a shameless self promotion, but some of you asked me 
> to say when I'll have my Java recipes for Apache Spark updated. It's done 
> here: http://jgp.net/2016/07/22/spark-java-recipes/ and in the GitHub repo. 
> 
> Enjoy / have a great week-end.
> 
> jg
> 
> 


Re: Running Spark in Standalone or local modes

2016-06-11 Thread Gavin Yue
Sorry I have a typo. 

Which means spark does not use yarn or mesos in standalone mode...



> On Jun 11, 2016, at 14:35, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Hi Gavin,
> 
> I believe in standalone mode a simple cluster manager is included with Spark 
> that makes it easy to set up a cluster. It does not rely on YARN or Mesos.
> 
> In summary this is from my notes:
> 
> Spark Local - Spark runs on the local host. This is the simplest set up and 
> best suited for learners who want to understand different concepts of Spark 
> and those performing unit testing.
> Spark Standalone – a simple cluster manager included with Spark that makes it 
> easy to set up a cluster.
> YARN Cluster Mode, the Spark driver runs inside an application master process 
> which is managed by YARN on the cluster, and the client can go away after 
> initiating the application.
> Mesos. I have not used it so cannot comment
> YARN Client Mode, the driver runs in the client process, and the application 
> master is only used for requesting resources from YARN. Unlike Local or Spark 
> standalone modes, in which the master’s address is specified in the --master 
> parameter, in YARN mode the ResourceManager’s address is picked up from the 
> Hadoop configuration. Thus, the --master parameter is yarn
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 11 June 2016 at 22:26, Gavin Yue <yue.yuany...@gmail.com> wrote:
>> The standalone mode is against Yarn mode or Mesos mode, which means spark 
>> uses Yarn or Mesos as cluster managements. 
>> 
>> Local mode is actually a standalone mode which everything runs on the single 
>> local machine instead of remote clusters.
>> 
>> That is my understanding. 
>> 
>> 
>>> On Sat, Jun 11, 2016 at 12:40 PM, Ashok Kumar 
>>> <ashok34...@yahoo.com.invalid> wrote:
>>> Thank you for grateful
>>> 
>>> I know I can start spark-shell by launching the shell itself
>>> 
>>> spark-shell 
>>> 
>>> Now I know that in standalone mode I can also connect to master
>>> 
>>> spark-shell --master spark://:7077
>>> 
>>> My point is what are the differences between these two start-up modes for 
>>> spark-shell? If I start spark-shell and connect to master what performance 
>>> gain will I get if any or it does not matter. Is it the same as for 
>>> spark-submit 
>>> 
>>> regards
>>> 
>>> 
>>> On Saturday, 11 June 2016, 19:39, Mohammad Tariq <donta...@gmail.com> wrote:
>>> 
>>> 
>>> Hi Ashok,
>>> 
>>> In local mode all the processes run inside a single jvm, whereas in 
>>> standalone mode we have separate master and worker processes running in 
>>> their own jvms.
>>> 
>>> To quickly test your code from within your IDE you could probable use the 
>>> local mode. However, to get a real feel of how Spark operates I would 
>>> suggest you to have a standalone setup as well. It's just the matter of 
>>> launching a standalone cluster either manually(by starting a master and 
>>> workers by hand), or by using the launch scripts provided with Spark 
>>> package. 
>>> 
>>> You can find more on this here.
>>> 
>>> HTH
>>> 
>>>  
>>> 
>>> Tariq, Mohammad
>>> about.me/mti
>>> 
>>> 
>>> 
>>>  
>>> 
>>> On Sat, Jun 11, 2016 at 11:38 PM, Ashok Kumar 
>>> <ashok34...@yahoo.com.invalid> wrote:
>>> Hi,
>>> 
>>> What is the difference between running Spark in Local mode or standalone 
>>> mode?
>>> 
>>> Are they the same. If they are not which is best suited for non prod work.
>>> 
>>> I am also aware that one can run Spark in Yarn mode as well.
>>> 
>>> Thanks
> 


Re: Running Spark in Standalone or local modes

2016-06-11 Thread Gavin Yue
The standalone mode is against Yarn mode or Mesos mode, which means spark
uses Yarn or Mesos as cluster managements.

Local mode is actually a standalone mode which everything runs on the
single local machine instead of remote clusters.

That is my understanding.


On Sat, Jun 11, 2016 at 12:40 PM, Ashok Kumar 
wrote:

> Thank you for grateful
>
> I know I can start spark-shell by launching the shell itself
>
> spark-shell
>
> Now I know that in standalone mode I can also connect to master
>
> spark-shell --master spark://:7077
>
> My point is what are the differences between these two start-up modes for
> spark-shell? If I start spark-shell and connect to master what performance
> gain will I get if any or it does not matter. Is it the same as for 
> spark-submit
>
>
>
> regards
>
>
> On Saturday, 11 June 2016, 19:39, Mohammad Tariq 
> wrote:
>
>
> Hi Ashok,
>
> In local mode all the processes run inside a single jvm, whereas in
> standalone mode we have separate master and worker processes running in
> their own jvms.
>
> To quickly test your code from within your IDE you could probable use the
> local mode. However, to get a real feel of how Spark operates I would
> suggest you to have a standalone setup as well. It's just the matter
> of launching a standalone cluster either manually(by starting a master and
> workers by hand), or by using the launch scripts provided with Spark
> package.
>
> You can find more on this *here*
> .
>
> HTH
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Sat, Jun 11, 2016 at 11:38 PM, Ashok Kumar <
> ashok34...@yahoo.com.invalid> wrote:
>
> Hi,
>
> What is the difference between running Spark in Local mode or standalone
> mode?
>
> Are they the same. If they are not which is best suited for non prod work.
>
> I am also aware that one can run Spark in Yarn mode as well.
>
> Thanks
>
>
>
>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-10 Thread Gavin Yue
Yes.  because in the second query, you did a  (select PK from A) A .  I
 guess it could the the subquery makes the results much smaller and make
the broadcastJoin, so it is much faster.

you could use sql.describe() to check the execution plan.


On Fri, Jun 10, 2016 at 1:41 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> I think if we try to see why is Query 2 faster than Query 1 then all the
> answers will be given without beating around the bush. That is the right
> way to find out what is happening and why.
>
>
> Regards,
> Gourav
>
> On Thu, Jun 9, 2016 at 11:19 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> Could you print out the sql execution plan? My guess is about broadcast
>> join.
>>
>>
>>
>> On Jun 9, 2016, at 07:14, Gourav Sengupta <gourav.sengu...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here
>> and is there a way we can optimize the queries in SPARK without the obvious
>> hack in Query2.
>>
>>
>> ---
>> ENVIRONMENT:
>> ---
>>
>> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3
>> million rows. Both the files are single gzipped csv file.
>> > Both table A and B are external tables in AWS S3 and created in HIVE
>> accessed through SPARK using HiveContext
>> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using
>> allowMaximumResource allocation and node types are c3.4xlarge).
>>
>> --
>> QUERY1:
>> --
>> select A.PK, B.FK
>> from A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>>
>>
>> This query takes 4 mins in HIVE and 1.1 hours in SPARK
>>
>>
>> --
>> QUERY 2:
>> --
>>
>> select A.PK, B.FK
>> from (select PK from A) A
>> left outer join B on (A.PK = B.FK)
>> where B.FK is not null;
>>
>> This query takes 4.5 mins in SPARK
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>


Re: HIVE Query 25x faster than SPARK Query

2016-06-09 Thread Gavin Yue
Could you print out the sql execution plan? My guess is about broadcast join. 



> On Jun 9, 2016, at 07:14, Gourav Sengupta  wrote:
> 
> Hi,
> 
> Query1 is almost 25x faster in HIVE than in SPARK. What is happening here and 
> is there a way we can optimize the queries in SPARK without the obvious hack 
> in Query2.
> 
> 
> ---
> ENVIRONMENT:
> ---
> 
> > Table A 533 columns x 24 million rows and Table B has 2 columns x 3 million 
> > rows. Both the files are single gzipped csv file.
> > Both table A and B are external tables in AWS S3 and created in HIVE 
> > accessed through SPARK using HiveContext
> > EMR 4.6, Spark 1.6.1 and Hive 1.0.0 (clusters started using 
> > allowMaximumResource allocation and node types are c3.4xlarge).
> 
> --
> QUERY1: 
> --
> select A.PK, B.FK
> from A 
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
> 
> 
> 
> This query takes 4 mins in HIVE and 1.1 hours in SPARK 
> 
> 
> --
> QUERY 2:
> --
> 
> select A.PK, B.FK
> from (select PK from A) A 
> left outer join B on (A.PK = B.FK)
> where B.FK is not null;
> 
> This query takes 4.5 mins in SPARK 
> 
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 


Re: Behaviour of RDD sampling

2016-05-31 Thread Gavin Yue
If not reading the whole dataset, how do you know the total number of records? 
If not knowing total number, how do you choose 30%?



> On May 31, 2016, at 00:45, pbaier  wrote:
> 
> Hi all,
> 
> I have to following use case:
> I have around 10k of jsons that I want to use for learning.
> The jsons are all stored in one file.
> 
> For learning a ML model, however, I only need around 30% of the jsons (the
> rest is not needed at all).
> So, my idea was to load all data into a RDD and then use the rdd.sample
> method to get my fraction of the data.
> I implemented this, and in the end it took as long as loading the whole data
> set.
> So I was wondering if Spark is still loading the whole dataset from disk and
> does the filtering afterwards?
> If this is the case, why does Spark not push down the filtering and load
> only a fraction of data from the disk?
> 
> Cheers,
> 
> Patrick
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Behaviour-of-RDD-sampling-tp27052.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a way to merge parquet small files?

2016-05-19 Thread Gavin Yue
For logs file I would suggest save as gziped text file first.  After 
aggregation, convert them into parquet by merging a few files. 



> On May 19, 2016, at 22:32, Deng Ching-Mallete  wrote:
> 
> IMO, it might be better to merge or compact the parquet files instead of 
> keeping lots of small files in the HDFS. Please refer to [1] for more info. 
> 
> We also encountered the same issue with the slow query, and it was indeed 
> caused by the many small parquet files. In our case, we were processing large 
> data sets with batch jobs instead of a streaming job. To solve our issue, we 
> just did a coalesce to reduce the number of partitions before saving as 
> parquet format. 
> 
> HTH,
> Deng
> 
> [1] http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> 
>> On Fri, May 20, 2016 at 1:50 PM, 王晓龙/0515  
>> wrote:
>> I’m using a spark streaming program to store log message into parquet file 
>> every 10 mins.
>> Now, when I query the parquet, it usually takes hundreds of thousands of 
>> stages to compute a single count.
>> I looked into the parquet file’s path and find a great amount of small files.
>> 
>> Do the small files caused the problem? Can I merge them, or is there a 
>> better way to solve it?
>> 
>> Lots of thanks.
>> 
>> 
>> 此邮件内容仅代表发送者的个人观点和意见,与招商银行股份有限公司及其下属分支机构的观点和意见无关,招商银行股份有限公司及其下属分支机构不对此邮件内容承担任何责任。此邮件内容仅限收件人查阅,如误收此邮件请立即删除。
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


Any NLP lib could be used on spark?

2016-04-19 Thread Gavin Yue
Hey,

Want to try the NLP on the spark. Could anyone recommend any easy to run
NLP open source lib on spark?

Also is there any recommended semantic network?

Thanks a lot.


Re: Spark and N-tier architecture

2016-03-29 Thread Gavin Yue
n-tiers or layers is mainly for separate a big problem into pieces smaller
problem.  So it is always valid.

Just for different application, it means different things.

Speaking of offline analytics, or big data eco-world, there are numerous
way of slicing the problem into different tier/layer.  You could search for
: Yarn/mesos/Spark layer and will find a lot of results/ppts.



On Tue, Mar 29, 2016 at 4:44 PM, Mich Talebzadeh 
wrote:

> Hi Mark,
>
> I beg I agree to differ on the interpretation of N-tier architecture.
> Agreed that 3-tier and by extrapolation N-tier have been around since days
> of client-server architecture. However, they are as valid today as 20 years
> ago. I believe the main recent expansion of n-tier has been on horizontal
> scaling and Spark by means of its clustering capability contributes to this
> model.
>
> Cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 March 2016 at 00:22, Mark Hamstra  wrote:
>
>> Yes and no.  The idea of n-tier architecture is about 20 years older than
>> Spark and doesn't really apply to Spark as n-tier was original conceived.
>> If the n-tier model helps you make sense of some things related to Spark,
>> then use it; but don't get hung up on trying to force a Spark architecture
>> into an outdated model.
>>
>> On Tue, Mar 29, 2016 at 5:02 PM, Ashok Kumar <
>> ashok34...@yahoo.com.invalid> wrote:
>>
>>> Thank you both.
>>>
>>> So am I correct that Spark fits in within the application tier in N-tier
>>> architecture?
>>>
>>>
>>> On Tuesday, 29 March 2016, 23:50, Alexander Pivovarov <
>>> apivova...@gmail.com> wrote:
>>>
>>>
>>> Spark is a distributed data processing engine plus distributed in-memory
>>> / disk data cache
>>>
>>> spark-jobserver provides REST API to your spark applications. It allows
>>> you to submit jobs to spark and get results in sync or async mode
>>>
>>> It also can create long running Spark context to cache RDDs in memory
>>> with some name (namedRDD) and then use it to serve requests from multiple
>>> users. Because RDD is in memory response should be super fast (seconds)
>>>
>>> https://github.com/spark-jobserver/spark-jobserver
>>>
>>>
>>> On Tue, Mar 29, 2016 at 2:50 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>> Interesting question.
>>>
>>> The most widely used application of N-tier is the traditional three-tier
>>> architecture that has been the backbone of Client-server architecture by
>>> having presentation layer, application layer and data layer. This is
>>> primarily for performance, scalability and maintenance. The most profound
>>> changes that Big data space has introduced to N-tier architecture is the
>>> concept of horizontal scaling as opposed to the previous tiers that relied
>>> on vertical scaling. HDFS is an example of horizontal scaling at the data
>>> tier by adding more JBODS to storage. Similarly adding more nodes to Spark
>>> cluster should result in better performance.
>>>
>>> Bear in mind that these tiers are at Logical levels which means that
>>> there or may not be so many so many physical layers. For example multiple
>>> virtual servers can be hosted on the same physical server.
>>>
>>> With regard to Spark, it is effectively a powerful query tools that sits
>>> in between the presentation layer (say Tableau) and the HDFS or Hive as you
>>> alluded. In that sense you can think of Spark as part of the application
>>> layer that communicates with the backend via a number of protocols
>>> including the standard JDBC. There is rather a blurred vision here whether
>>> Spark is a database or query tool. IMO it is a query tool in a sense that
>>> Spark by itself does not have its own storage concept or metastore. Thus it
>>> relies on others to provide that service.
>>>
>>> HTH
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> On 29 March 2016 at 22:07, Ashok Kumar 
>>> wrote:
>>>
>>> Experts,
>>>
>>> One of terms used and I hear is N-tier architecture within Big Data used
>>> for availability, performance etc. I also hear that Spark by means of its
>>> query engine and in-memory caching fits into middle tier (application
>>> layer) with HDFS and Hive may be providing the data tier.  Can someone
>>> elaborate the role of Spark here. For example A Scala program that we write
>>> uses JDBC to talk to databases so in that sense is Spark a middle tier
>>> application?
>>>
>>> I hope that someone can clarify this and if so what would the best
>>> practice in 

Re: Spark and N-tier architecture

2016-03-29 Thread Gavin Yue
It is a separate project based on my understanding.  I am currently evaluating 
it right now. 


> On Mar 29, 2016, at 16:17, Michael Segel  wrote:
> 
> 
> 
>> Begin forwarded message:
>> 
>> From: Michael Segel 
>> Subject: Re: Spark and N-tier architecture
>> Date: March 29, 2016 at 4:16:44 PM MST
>> To: Alexander Pivovarov 
>> Cc: Mich Talebzadeh , Ashok Kumar 
>> , User 
>> 
>> So… 
>> 
>> Is spark-jobserver an official part of spark or something else? 
>> 
>> From what I can find via a quick Google … this isn’t part of the core spark 
>> distribution.
>> 
>>> On Mar 29, 2016, at 3:50 PM, Alexander Pivovarov  
>>> wrote:
>>> 
>>> https://github.com/spark-jobserver/spark-jobserver
> 


Re: 回复: a new FileFormat 5x~100x faster than parquet

2016-02-22 Thread Gavin Yue
I recommend you provide more information. Using inverted index certainly speed 
up the query time if hitting the index, but it would take longer to create and 
insert.  

Is the source code not available at this moment? 

Thanks 
Gavin 

> On Feb 22, 2016, at 20:27, 开心延年  wrote:
> 
> if apache enjoy this project , of course we provider the source code .
> 
> BUt if apache dislike the porject , we had continue to improve the project by 
> myself .
> 
> ya100 and ydb  max process data is 180billions rows data per day for neary 
> realtime import .
> 
> because of index ,we make the search 10 secondes return in 1800billion 
> (10days) rows data.
> 
> 
> -- 原始邮件 --
> 发件人: "Akhil Das";;
> 发送时间: 2016年2月22日(星期一) 晚上8:42
> 收件人: "开心延年";
> 抄送: "user"; "dev";
> 主题: Re: a new FileFormat 5x~100x faster than parquet
> 
> Would be good to see the source code and the documentation in English.
> 
> Thanks
> Best Regards
> 
>> On Mon, Feb 22, 2016 at 4:44 PM, 开心延年  wrote:
>> Ya100 is a FileFormat 5x~100x  faster than parquet。
>> we can get ya100 from this link 
>> https://github.com/ycloudnet/ya100/tree/master/v1.0.8
>> 
>> 
>> 
>> 
>> 1.we used the inverted index,so we skip the rows that we does need.
>>   for example  the trade log search SQL
>>
>> select
>> (1)phonenum,usernick,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_day,amtlong
>>  
>> from spark_txt where  
>> (2)tradeid=' 2014012213870282671'
>> limit 10;
>> 
>>  this sql is compose by two part
>>  (1)the part 1 is return the result which has 9 columns
>>  (2) the part 2 is the filter condition ,filter by tradeid
>>   
>>   let guess which plan is faster
>>  plan A :first read all the 9 columns result then filter by tradeid
>>  plan B: first filter by tradeid ,then we read the match 9 columns 
>> result.
>> 
>> Ya100 choose plan B
>> 
>>  contrast  performance Ya100`index with parquet
>> 
>> <60C62790@C4EFD745.B6FECA56>
>> 
>> 
>> 2.TOP N sort ,the non sort column we doesn`t read it until the last 
>> 
>>   for example  we sort by the logtime
>> select
>> (1)phonenum,usernick,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_day,amtlong
>>  
>> from spark_txt
>> (2)order by logtime desc
>> limit 10;
>> 
>>   this sql is compose by two part
>>  (1)the part 1 is return the result which has 9 columns
>>  (2) the part 2 is the column need to sort
>>   
>>   let guess which plan is faster
>>  plan A :first read all the 9 columns result then sort by logtime
>>  plan B: first sort by logtime ,then we read the match 9 columns result.
>> 
>>Ya100 choose plan B
>> 
>>  contrast  performance Ya100`lazy read with parquet
>> <88FABE61@C4EFD745.B6FECA56>
>> 
>> 3.we used label instead of the original value for grouping and sorting
>> 
>> 
>> 
>> 1).General situation,the data has a lot of repeat value,for exampe the sex 
>> file ,the age field .
>> 2).if we store the original value ,that will weast a lot of storage.
>> so we make a small modify at original  value, Additional add a new filed 
>> called label.
>> make a unique value sort by fields, and then gave each term a unique  Number 
>> from begin to end  .
>> 3).we use number value(we called label) instead of original  value.lable is 
>> store by fixed length. the file could be read by random read.
>> 4).the label`s order is the same with dictionary  order .so if we do some 
>> calculation like order by or group by only need to  read the label. we don`t 
>> need to read the original value.
>> 5).some field like sex field ,only have 2 different values.so we only use 2 
>> bits(not 2 bytes) to store the label, it will save a lot of Disk io.
>>  when we finish all of the calculation, we translate label to original  
>> value by a dictionary.
>> 6)if a lots of rows have the same original value ,the original value we only 
>> store once,only read once.
>> Solve the problem:
>> 1)ya100`s data is quite big we don`t have enough memory to load all Values 
>> to memory.
>> 2)on realtime mode ,data is change Frequent , The cache is invalidated 
>> Frequent by append or update. build Cache will take a lot of times and io;
>> 3)the Original value  is a string type.  whene sorting or grouping ,thed 
>> string value need a lot of memory and need lot of cpu time to calculate 
>> hashcode \compare \equals ,But label is number  is fast.
>> 4)the label is number ,it`s type maybe short ,or maybe a byte ,or may be 
>> integer whitch depending on the max number of the label.
>> 
>> two-phase search
>> Original:
>> 1)group by order by use original value,the real value may be is a string 
>> type,may be more larger ,the real value maybe  need a lot of io 
>> 2)compare by string is slowly then compare by integer
>> Our 

Re: Creating HiveContext in Spark-Shell fails

2016-02-15 Thread Gavin Yue
This sqlContext is one instance of hive context, do not be confused by the 
name.  



> On Feb 16, 2016, at 12:51, Prabhu Joseph  wrote:
> 
> Hi All,
> 
> On creating HiveContext in spark-shell, fails with 
> 
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
> database /SPARK/metastore_db.
> 
> Spark-Shell already has created metastore_db for SqlContext. 
> 
> Spark context available as sc.
> SQL context available as sqlContext.
> 
> But without HiveContext, i am able to query the data using SqlContext . 
> 
> scala>  var df = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", 
> "true").option("inferSchema", "true").load("/SPARK/abc")
> df: org.apache.spark.sql.DataFrame = [Prabhu: string, Joseph: string]
> 
> So is there any real need for HiveContext inside Spark Shell. Is everything 
> that can be done with HiveContext, achievable with SqlContext inside Spark 
> Shell.
> 
> 
> 
> Thanks,
> Prabhu Joseph
> 
> 
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How parquet file decide task number?

2016-02-03 Thread Gavin Yue
I am doing a simple count like:

sqlContext.read.parquet("path").count

I have only 5000 parquet files.  But generate over 2 tasks.

Each parquet file is converted from one gz text file.

Please give some advice.

Thanks


Re: How parquet file decide task number?

2016-02-03 Thread Gavin Yue
Found the answer. It is the block size.

Thanks.

On Wed, Feb 3, 2016 at 5:05 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I am doing a simple count like:
>
> sqlContext.read.parquet("path").count
>
> I have only 5000 parquet files.  But generate over 2 tasks.
>
> Each parquet file is converted from one gz text file.
>
> Please give some advice.
>
> Thanks
>
>
>
>


Re: Too many tasks killed the scheduler

2016-01-11 Thread Gavin Yue
Thank you for the suggestion.

I tried the df.coalesce(1000).write.parquet() and yes, the parquet file
number drops to 1000, but the parition of parquet stills is like 5000+.
When I read the parquet and do a count, it still has the 5000+ tasks.

So I guess I need to do a repartition here to drop task number?  But
repartition never works for me, always failed due to out of memory.

And regarding the large number task delay problem, I found a similar
problem: https://issues.apache.org/jira/browse/SPARK-7447.

I am unionALL like 10 parquet folder, with totally 70K+ parquet files,
generating 70k+ taskes. It took around 5-8 mins before all tasks start just
like the ticket abover.

It also happens if I do a partition discovery with base path.Is there
any schema inference or checking doing, which causes the slowness?

Thanks,
Gavin



On Mon, Jan 11, 2016 at 1:21 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Could you use "coalesce" to reduce the number of partitions?
>
>
> Shixiong Zhu
>
>
> On Mon, Jan 11, 2016 at 12:21 AM, Gavin Yue <yue.yuany...@gmail.com>
> wrote:
>
>> Here is more info.
>>
>> The job stuck at:
>> INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks
>>
>> Then got the error:
>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
>> after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
>>
>> So I increased spark.network.timeout from 120s to 600s.  It sometimes
>> works.
>>
>> Each task is a parquet file.  I could not repartition due to out of GC
>> problem.
>>
>> Is there any way I could to improve the performance?
>>
>> Thanks,
>> Gavin
>>
>>
>> On Sun, Jan 10, 2016 at 1:51 AM, Gavin Yue <yue.yuany...@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I have 10 days data, each day has a parquet directory with over 7000
>>> partitions.
>>> So when I union 10 days and do a count, then it submits over 70K tasks.
>>>
>>> Then the job failed silently with one container exit with code 1.  The
>>> union with like 5, 6 days data is fine.
>>> In the spark-shell, it just hang showing: Yarn scheduler submit 7+
>>> tasks.
>>>
>>> I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
>>> change to make this work?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>
>


Re: Too many tasks killed the scheduler

2016-01-11 Thread Gavin Yue
Here is more info.

The job stuck at:
INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks

Then got the error:
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout

So I increased spark.network.timeout from 120s to 600s.  It sometimes
works.

Each task is a parquet file.  I could not repartition due to out of GC
problem.

Is there any way I could to improve the performance?

Thanks,
Gavin


On Sun, Jan 10, 2016 at 1:51 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Hey,
>
> I have 10 days data, each day has a parquet directory with over 7000
> partitions.
> So when I union 10 days and do a count, then it submits over 70K tasks.
>
> Then the job failed silently with one container exit with code 1.  The
> union with like 5, 6 days data is fine.
> In the spark-shell, it just hang showing: Yarn scheduler submit 7+
> tasks.
>
> I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
> change to make this work?
>
> Thanks,
> Gavin
>
>
>


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-11 Thread Gavin Yue
Has anyone used Ignite in production system ?


On Mon, Jan 11, 2016 at 11:44 PM, Jörn Franke  wrote:

> You can look at ignite as a HDFS cache or for  storing rdds.
>
> > On 11 Jan 2016, at 21:14, Dmitry Goldenberg 
> wrote:
> >
> > We have a bunch of Spark jobs deployed and a few large resource files
> such as e.g. a dictionary for lookups or a statistical model.
> >
> > Right now, these are deployed as part of the Spark jobs which will
> eventually make the mongo-jars too bloated for deployments.
> >
> > What are some of the best practices to consider for maintaining and
> sharing large resource files like these?
> >
> > Thanks.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


parquet repartitions and parquet.enable.summary-metadata does not work

2016-01-10 Thread Gavin Yue
Hey,

I am trying to convert a bunch of json files into parquet, which would
output over 7000 parquet files.  But tthere are too many files, so I want
to repartition based on id to 3000.

But I got the error of GC problem like this one:
https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAB4bC7_LR2rpHceQw3vyJ=l6xq9+9sjl3wgiispzyfh2xmt...@mail.gmail.com%3E#archives

So I set  parquet.enable.summary-metadata to false. But when I
write.parquet, I could still see the 3000 jobs run after the writing
parquet and they failed due to GC.

Basically repartition never succeeded for me. Is there any other settings
which could be optimized?

Thanks,
Gavin


Too many tasks killed the scheduler

2016-01-10 Thread Gavin Yue
Hey,

I have 10 days data, each day has a parquet directory with over 7000
partitions.
So when I union 10 days and do a count, then it submits over 70K tasks.

Then the job failed silently with one container exit with code 1.  The
union with like 5, 6 days data is fine.
In the spark-shell, it just hang showing: Yarn scheduler submit 7+
tasks.

I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
change to make this work?

Thanks,
Gavin


Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Gavin Yue
I saw in the document, the value is LZO.Is it LZO or LZ4?

https://github.com/Cyan4973/lz4

Based on this benchmark, they differ quite a lot.



On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> gzip is relatively slow. It consumes much CPU.
>
> snappy is faster.
>
> LZ4 is faster than GZIP and smaller than Snappy.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> Thank you .
>>
>> And speaking of compression, is there big difference on performance
>> between gzip and snappy? And why parquet is using gzip by default?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Cycling old bits:
>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>
>>> Gavin:
>>> Which release of hbase did you play with ?
>>>
>>> HBase has been evolving and is getting more stable.
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> I used to maintain a HBase cluster. The experience with it was not
>>>> happy.
>>>>
>>>> I just tried query the data  from each day's first and dedup with
>>>> smaller set, the performance is acceptable.  So I guess I will use this
>>>> method.
>>>>
>>>> Again, could anyone give advice about:
>>>>
>>>>- Automatically determine the number of reducers for joins and
>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>parallelism post-shuffle using “SET
>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>> Thanks.
>>>>
>>>> Gavin
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> bq. in an noSQL db such as Hbase
>>>>>
>>>>> +1 :-)
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>>>
>>>>>> One option you may want to explore is writing event table in an noSQL
>>>>>> db such as Hbase. One inherent problem in your approach is you always 
>>>>>> need
>>>>>> to load either full data set or a defined number of partitions to see if
>>>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>>>> unnecessary loading in most cases).
>>>>>>
>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>>> parallism.
>>>>>>>
>>>>>>> In the doc, it mentions:
>>>>>>>
>>>>>>>- Automatically determine the number of reducers for joins and
>>>>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>>parallelism post-shuffle using “SET
>>>>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>
>>>>>>>
>>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>>> hardware of cluster?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Gavin
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>>- What spark version did you use? It seems at least 1.4. If you
>>>>>>>>use spark-sql and tungsten, you might have better performance. but 
>>>>>>>> spark
>>>>>>>>1.5.2 gave me a wrong result when the data was about 300~400GB, 
>>>>>>>> just f

Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Gavin Yue
So I tried to set the parquet compression codec to lzo, but hadoop does not
have the lzo natives, while lz4 does included.
But I could set the code to lz4, it only accepts lzo.

Any solution here?

Thank,
Gavin



On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I saw in the document, the value is LZO.Is it LZO or LZ4?
>
> https://github.com/Cyan4973/lz4
>
> Based on this benchmark, they differ quite a lot.
>
>
>
> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> gzip is relatively slow. It consumes much CPU.
>>
>> snappy is faster.
>>
>> LZ4 is faster than GZIP and smaller than Snappy.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> Thank you .
>>>
>>> And speaking of compression, is there big difference on performance
>>> between gzip and snappy? And why parquet is using gzip by default?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Cycling old bits:
>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>>
>>>> Gavin:
>>>> Which release of hbase did you play with ?
>>>>
>>>> HBase has been evolving and is getting more stable.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> I used to maintain a HBase cluster. The experience with it was not
>>>>> happy.
>>>>>
>>>>> I just tried query the data  from each day's first and dedup with
>>>>> smaller set, the performance is acceptable.  So I guess I will use this
>>>>> method.
>>>>>
>>>>> Again, could anyone give advice about:
>>>>>
>>>>>- Automatically determine the number of reducers for joins and
>>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>parallelism post-shuffle using “SET
>>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Gavin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq. in an noSQL db such as Hbase
>>>>>>
>>>>>> +1 :-)
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> One option you may want to explore is writing event table in an
>>>>>>> noSQL db such as Hbase. One inherent problem in your approach is you 
>>>>>>> always
>>>>>>> need to load either full data set or a defined number of partitions to 
>>>>>>> see
>>>>>>> if the event has already come (and no gurantee it is full proof, but 
>>>>>>> lead
>>>>>>> to unnecessary loading in most cases).
>>>>>>>
>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>>>> parallism.
>>>>>>>>
>>>>>>>> In the doc, it mentions:
>>>>>>>>
>>>>>>>>- Automatically determine the number of reducers for joins and
>>>>>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>>>parallelism post-shuffle using “SET
>>>>>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>>
>>>>>>>>
>>>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>>>> hardware of cluster?
>>>>>>>>
>>>>>>>>
>>>&

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
hey Ted,

Event table is like this: UserID, EventType, EventKey, TimeStamp,
MetaData.  I just parse it from Json and save as Parquet, did not change
the partition.

Annoyingly, every day's incoming Event data having duplicates among each
other.  One same event could show up in Day1 and Day2 and probably Day3.

I only want to keep single Event table and each day it come so many
duplicates.

Is there a way I could just insert into Parquet and if duplicate found,
just ignore?

Thanks,
Gavin







On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Is your Parquet data source partitioned by date ?
>
> Can you dedup within partitions ?
>
> Cheers
>
> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> I tried on Three day's data.  The total input is only 980GB, but the
>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>> step, which should be another 6.2TB shuffle read.
>>
>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>> could do to stablize this process?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> I got everyday's Event table and want to merge them into a single Event
>>> table. But there so many duplicates among each day's data.
>>>
>>> I use Parquet as the data source.  What I am doing now is
>>>
>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>> file").
>>>
>>> Each day's Event is stored in their own Parquet file
>>>
>>> But it failed at the stage2 which keeps losing connection to one
>>> executor. I guess this is due to the memory issue.
>>>
>>> Any suggestion how I do this efficiently?
>>>
>>> Thanks,
>>> Gavin
>>>
>>
>>
>


How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Hey,

I got everyday's Event table and want to merge them into a single Event
table. But there so many duplicates among each day's data.

I use Parquet as the data source.  What I am doing now is

EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
file").

Each day's Event is stored in their own Parquet file

But it failed at the stage2 which keeps losing connection to one executor.
I guess this is due to the memory issue.

Any suggestion how I do this efficiently?

Thanks,
Gavin


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
And the most frequent operation I am gonna do is find the UserID who have
some events, then retrieve all the events associted with the UserID.

In this case, how should I partition to speed up the process?

Thanks.

On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> hey Ted,
>
> Event table is like this: UserID, EventType, EventKey, TimeStamp,
> MetaData.  I just parse it from Json and save as Parquet, did not change
> the partition.
>
> Annoyingly, every day's incoming Event data having duplicates among each
> other.  One same event could show up in Day1 and Day2 and probably Day3.
>
> I only want to keep single Event table and each day it come so many
> duplicates.
>
> Is there a way I could just insert into Parquet and if duplicate found,
> just ignore?
>
> Thanks,
> Gavin
>
>
>
>
>
>
>
> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Is your Parquet data source partitioned by date ?
>>
>> Can you dedup within partitions ?
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> I tried on Three day's data.  The total input is only 980GB, but the
>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>> step, which should be another 6.2TB shuffle read.
>>>
>>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>>> could do to stablize this process?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> I got everyday's Event table and want to merge them into a single Event
>>>> table. But there so many duplicates among each day's data.
>>>>
>>>> I use Parquet as the data source.  What I am doing now is
>>>>
>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>> file").
>>>>
>>>> Each day's Event is stored in their own Parquet file
>>>>
>>>> But it failed at the stage2 which keeps losing connection to one
>>>> executor. I guess this is due to the memory issue.
>>>>
>>>> Any suggestion how I do this efficiently?
>>>>
>>>> Thanks,
>>>> Gavin
>>>>
>>>
>>>
>>
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
I tried on Three day's data.  The total input is only 980GB, but the
shuffle write Data is about 6.2TB, then the job failed during shuffle read
step, which should be another 6.2TB shuffle read.

I think to Dedup, the shuffling can not be avoided. Is there anything I
could do to stablize this process?

Thanks.


On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Hey,
>
> I got everyday's Event table and want to merge them into a single Event
> table. But there so many duplicates among each day's data.
>
> I use Parquet as the data source.  What I am doing now is
>
> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
> file").
>
> Each day's Event is stored in their own Parquet file
>
> But it failed at the stage2 which keeps losing connection to one executor.
> I guess this is due to the memory issue.
>
> Any suggestion how I do this efficiently?
>
> Thanks,
> Gavin
>


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
I used to maintain a HBase cluster. The experience with it was not happy.

I just tried query the data  from each day's first and dedup with smaller
set, the performance is acceptable.  So I guess I will use this method.

Again, could anyone give advice about:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

Thanks.

Gavin




On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. in an noSQL db such as Hbase
>
> +1 :-)
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> One option you may want to explore is writing event table in an noSQL db
>> such as Hbase. One inherent problem in your approach is you always need to
>> load either full data set or a defined number of partitions to see if the
>> event has already come (and no gurantee it is full proof, but lead to
>> unnecessary loading in most cases).
>>
>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>> wrote:
>>
>>> Hey,
>>> Thank you for the answer. I checked the setting you mentioend they are
>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>
>>> In the doc, it mentions:
>>>
>>>- Automatically determine the number of reducers for joins and
>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>parallelism post-shuffle using “SET
>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>>
>>> What would be the ideal number for this setting? Is it based on the
>>> hardware of cluster?
>>>
>>>
>>> Thanks,
>>>
>>> Gavin
>>>
>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>> wrote:
>>>
>>>>
>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>- What spark version did you use? It seems at least 1.4. If you use
>>>>spark-sql and tungsten, you might have better performance. but spark 
>>>> 1.5.2
>>>>gave me a wrong result when the data was about 300~400GB, just for a 
>>>> simple
>>>>group-by and aggregate.
>>>>- Did you use kyro serialization?
>>>>- you should have spark.shuffle.compress=true, verify it.
>>>>- How many tasks did you use? spark.default.parallelism=?
>>>>- What about this:
>>>>   - Read the data day by day
>>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>>   - Write into different buckets (you probably need a special
>>>>   writer to write data efficiently without shuffling the data).
>>>>   - distinct for each bucket. Because each bucket is small, spark
>>>>   can get it done faster than having everything in one run.
>>>>   - I think using groupBy (userId, timestamp) might be better than
>>>>   distinct. I guess distinct() will compare every field.
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>
>>>>> In this case, how should I partition to speed up the process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hey Ted,
>>>>>>
>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>> the partition.
>>>>>>
>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>> Day3.
>>>>>>
>>>>>> I only want to keep single Event table and each day it come so many
>>>>>> duplicates.
>>>>>>
>>>>>> Is there a way I could just insert into Parquet and if du

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Thank you .

And speaking of compression, is there big difference on performance between
gzip and snappy? And why parquet is using gzip by default?

Thanks.


On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Cycling old bits:
> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>
> Gavin:
> Which release of hbase did you play with ?
>
> HBase has been evolving and is getting more stable.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> I used to maintain a HBase cluster. The experience with it was not happy.
>>
>> I just tried query the data  from each day's first and dedup with smaller
>> set, the performance is acceptable.  So I guess I will use this method.
>>
>> Again, could anyone give advice about:
>>
>>- Automatically determine the number of reducers for joins and
>>groupbys: Currently in Spark SQL, you need to control the degree of
>>parallelism post-shuffle using “SET
>>spark.sql.shuffle.partitions=[num_tasks];”.
>>
>> Thanks.
>>
>> Gavin
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq. in an noSQL db such as Hbase
>>>
>>> +1 :-)
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> One option you may want to explore is writing event table in an noSQL
>>>> db such as Hbase. One inherent problem in your approach is you always need
>>>> to load either full data set or a defined number of partitions to see if
>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>> unnecessary loading in most cases).
>>>>
>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>>> all correct.  I noticed that in the job, there are always only 200 
>>>>> reducers
>>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>>
>>>>> In the doc, it mentions:
>>>>>
>>>>>- Automatically determine the number of reducers for joins and
>>>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>parallelism post-shuffle using “SET
>>>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>
>>>>>
>>>>> What would be the ideal number for this setting? Is it based on the
>>>>> hardware of cluster?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Gavin
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>- I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>- What spark version did you use? It seems at least 1.4. If you
>>>>>>use spark-sql and tungsten, you might have better performance. but 
>>>>>> spark
>>>>>>1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>>>>>> for a
>>>>>>simple group-by and aggregate.
>>>>>>- Did you use kyro serialization?
>>>>>>- you should have spark.shuffle.compress=true, verify it.
>>>>>>- How many tasks did you use? spark.default.parallelism=?
>>>>>>- What about this:
>>>>>>   - Read the data day by day
>>>>>>   - compute a bucket id from timestamp, e.g., the date and hour
>>>>>>   - Write into different buckets (you probably need a special
>>>>>>   writer to write data efficiently without shuffling the data).
>>>>>>   - distinct for each bucket. Because each bucket is small,
>>>>>>   spark can get it done faster than having everything in one run.
>>>>>>   - I think using groupBy (userId, timestamp) might be better
>>>>>>   than distinct. I guess distinct() will compare every field.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> And t

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Gavin Yue
Hey,
Thank you for the answer. I checked the setting you mentioend they are all
correct.  I noticed that in the job, there are always only 200 reducers for
shuffle read, I believe it is setting in the sql shuffle parallism.

In the doc, it mentions:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.


What would be the ideal number for this setting? Is it based on the
hardware of cluster?


Thanks,

Gavin

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If you use
>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>gave me a wrong result when the data was about 300~400GB, just for a simple
>group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and hour
>   - Write into different buckets (you probably need a special writer
>   to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small, spark can
>   get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better than
>   distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Is your Parquet data source partitioned by date ?
>>>>
>>>> Can you dedup within partitions ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>> step, which should be another 6.2TB shuffle read.
>>>>>
>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>> I could do to stablize this process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>
>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>
>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>>>> file").
>>>>>>
>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>
>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>> executor. I guess this is due to the memory issue.
>>>>>>
>>>>>> Any suggestion how I do this efficiently?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to concat few rows into a new column in dataframe

2016-01-05 Thread Gavin Yue
I tried the Ted's solution and it works.   But I keep hitting the JVM out
of memory problem.
And grouping the key causes a lot of  data shuffling.

So I am trying to order the data based on ID first and save as Parquet.  Is
there way to make sure that the data is partitioned that each ID's data is
in one partition, so there would be no shuffling in the future?

Thanks.


On Tue, Jan 5, 2016 at 3:19 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> This would also be possible with an Aggregator in Spark 1.6:
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>
> On Tue, Jan 5, 2016 at 2:59 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Something like the following:
>>
>> val zeroValue = collection.mutable.Set[String]()
>>
>> val aggredated = data.aggregateByKey (zeroValue)((set, v) => set += v,
>> (setOne, setTwo) => setOne ++= setTwo)
>>
>> On Tue, Jan 5, 2016 at 2:46 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> For example, a table df with two columns
>>> id  name
>>> 1   abc
>>> 1   bdf
>>> 2   ab
>>> 2   cd
>>>
>>> I want to group by the id and concat the string into array of string.
>>> like this
>>>
>>> id
>>> 1 [abc,bdf]
>>> 2 [ab, cd]
>>>
>>> How could I achieve this in dataframe?  I stuck on df.groupBy("id"). ???
>>>
>>> Thanks
>>>
>>>
>>
>


Re: How to concat few rows into a new column in dataframe

2016-01-05 Thread Gavin Yue
I found that in 1.6 dataframe could do repartition.

Should I still need to do orderby first or I just have to repartition?




On Tue, Jan 5, 2016 at 9:25 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I tried the Ted's solution and it works.   But I keep hitting the JVM out
> of memory problem.
> And grouping the key causes a lot of  data shuffling.
>
> So I am trying to order the data based on ID first and save as Parquet.
> Is there way to make sure that the data is partitioned that each ID's data
> is in one partition, so there would be no shuffling in the future?
>
> Thanks.
>
>
> On Tue, Jan 5, 2016 at 3:19 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> This would also be possible with an Aggregator in Spark 1.6:
>>
>> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>>
>> On Tue, Jan 5, 2016 at 2:59 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Something like the following:
>>>
>>> val zeroValue = collection.mutable.Set[String]()
>>>
>>> val aggredated = data.aggregateByKey (zeroValue)((set, v) => set += v,
>>> (setOne, setTwo) => setOne ++= setTwo)
>>>
>>> On Tue, Jan 5, 2016 at 2:46 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> For example, a table df with two columns
>>>> id  name
>>>> 1   abc
>>>> 1   bdf
>>>> 2   ab
>>>> 2   cd
>>>>
>>>> I want to group by the id and concat the string into array of string.
>>>> like this
>>>>
>>>> id
>>>> 1 [abc,bdf]
>>>> 2 [ab, cd]
>>>>
>>>> How could I achieve this in dataframe?  I stuck on df.groupBy("id"). ???
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>
>


How to accelerate reading json file?

2016-01-05 Thread Gavin Yue
I am trying to read json files following the example:

val path = "examples/src/main/resources/jsonfile"val people =
sqlContext.read.json(path)

I have 1 Tb size files in the path.  It took 1.2 hours to finish the
reading to infer the schema.

But I already know the schema. Could I make this process short?

Thanks a lot.


How to concat few rows into a new column in dataframe

2016-01-05 Thread Gavin Yue
Hey,

For example, a table df with two columns
id  name
1   abc
1   bdf
2   ab
2   cd

I want to group by the id and concat the string into array of string. like
this

id
1 [abc,bdf]
2 [ab, cd]

How could I achieve this in dataframe?  I stuck on df.groupBy("id"). ???

Thanks


Should I convert json into parquet?

2015-10-17 Thread Gavin Yue
I have json files which contains timestamped events.  Each event associate
with a user id.

Now I want to group by user id. So converts from

Event1 -> UserIDA;
Event2 -> UserIDA;
Event3 -> UserIDB;

To intermediate storage.
UserIDA -> (Event1, Event2...)
UserIDB-> (Event3...)

Then I will label positives and featurize the Events Vector in many
different ways, fit each of them into the Logistic Regression.

I want to save intermediate storage permanently since it will be used many
times.  And there will new events coming every day. So I need to update
this intermediate storage every day.

Right now I store intermediate storage using Json files.  Should I use
Parquet instead?  Or is there better solutions for this use case?

Thanks a lot !


Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-26 Thread Gavin Yue
It is working, We are doing the same thing everyday.  But the remote server
needs to able to talk with ResourceManager.

If you are using Spark-submit,  your will also specify the hadoop conf
directory in your Env variable. Spark would rely on that to locate where
the cluster's resource manager is.

I think this tutorial is pretty clear:
http://spark.apache.org/docs/latest/running-on-yarn.html



On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu <zchl.j...@yahoo.com> wrote:

> Hi Yue,
>
> Thanks very much for your kind reply.
>
> I would like to submit spark job remotely on another machine outside the
> cluster,
> and the job will run on yarn, similar as hadoop job is already done, could
> you
> confirm it could exactly work for spark...
>
> Do you mean that I would print those variables on linux command side?
>
> Best Regards,
> Zhiliang
>
>
>
>
>
> On Saturday, September 26, 2015 10:07 AM, Gavin Yue <
> yue.yuany...@gmail.com> wrote:
>
>
> Print out your env variables and check first
>
> Sent from my iPhone
>
> On Sep 25, 2015, at 18:43, Zhiliang Zhu <zchl.j...@yahoo.com.INVALID
> <zchl.j...@yahoo.com.invalid>> wrote:
>
> Hi All,
>
> I would like to submit spark job on some another remote machine outside
> the cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
>
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
>
> This issue is urgent for me, would some expert provide some help about
> this problem...
>
> I will show sincere appreciation towards your help.
>
> Thank you!
> Best Regards,
> Zhiliang
>
>
>
>
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com.INVALID <zchl.j...@yahoo.com.invalid>> wrote:
>
>
> Hi all,
>
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the
> specific spark install directory
>
> It will work well to submit spark job on master node of cluster, however,
> it will fail by way of some gateway machine remotely.
>
> The gateway machine is already configed, it works well to submit hadoop
> job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
>
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set,
> SparkContext will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
>
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service
> 'sparkYarnAM' on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to
> be reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
>
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
>
>
>
>
>
>
>


Re: executor-cores setting does not work under Yarn

2015-09-25 Thread Gavin Yue
I think I found the problem.

Have to change the yarn capacity scheduler to use

DominantResourceCalculator

Thanks!


On Fri, Sep 25, 2015 at 4:54 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Which version of spark are you having? Can you also check whats set in
> your conf/spark-defaults.conf file?
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:58 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>
>> Running Spark app over Yarn 2.7
>>
>> Here is my sparksubmit setting:
>> --master yarn-cluster \
>>  --num-executors 100 \
>>  --executor-cores 3 \
>>  --executor-memory 20g \
>>  --driver-memory 20g \
>>  --driver-cores 2 \
>>
>> But the executor cores setting is not working. It always assigns only one
>> vcore  to one container based on the cluster metrics from yarn resource
>> manager website.
>>
>> And yarn setting for container is
>> min: <memory:6600, vCores:4>  max: <memory:106473, vCores:15>
>>
>> I have tried to change num-executors and executor memory. It even ignores
>> the min cCores setting and always assign one core per container.
>>
>> Any advice?
>>
>> Thank you!
>>
>>
>>
>>
>>
>


Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Gavin Yue
Print out your env variables and check first 

Sent from my iPhone

> On Sep 25, 2015, at 18:43, Zhiliang Zhu  wrote:
> 
> Hi All,
> 
> I would like to submit spark job on some another remote machine outside the 
> cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
> 
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
> 
> This issue is urgent for me, would some expert provide some help about this 
> problem...
> 
> I will show sincere appreciation towards your help.
> 
> Thank you!
> Best Regards,
> Zhiliang
> 
> 
> 
> 
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu 
>  wrote:
> 
> 
> Hi all,
> 
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or 
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the 
> specific spark install directory 
> 
> It will work well to submit spark job on master node of cluster, however, it 
> will fail by way of some gateway machine remotely.
> 
> The gateway machine is already configed, it works well to submit hadoop job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
> 
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
> will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
> 
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
> on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
> reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
> 
> 
> 
> 


executor-cores setting does not work under Yarn

2015-09-24 Thread Gavin Yue
Running Spark app over Yarn 2.7

Here is my sparksubmit setting:
--master yarn-cluster \
 --num-executors 100 \
 --executor-cores 3 \
 --executor-memory 20g \
 --driver-memory 20g \
 --driver-cores 2 \

But the executor cores setting is not working. It always assigns only one
vcore  to one container based on the cluster metrics from yarn resource
manager website.

And yarn setting for container is
min:   max: 

I have tried to change num-executors and executor memory. It even ignores
the min cCores setting and always assign one core per container.

Any advice?

Thank you!


Cache after filter Vs Writing back to HDFS

2015-09-17 Thread Gavin Yue
For a large dataset, I want to filter out something and then do the
computing intensive work.

What I am doing now:

Data.filter(somerules).cache()
Data.count()

Data.map(timeintensivecompute)

But this sometimes takes unusually long time due to cache missing and
recalculation.

So I changed to this way.

Data.filter.saveasTextFile()

sc.testFile(),map(timeintesivecompute)

Second one is even faster.

How could I tune the job to reach maximum performance?

Thank you.


Performance changes quite large

2015-09-17 Thread Gavin Yue
I am trying to parse quite a lot large json files.

At the beginning, I am doing like this

textFile(path).map(parseJson(line)).count()

For each file(800 - 900 Mb), it would take roughtly 1 min to finish.

I then changed the code tl

val rawData = textFile(path)
rawData.cache()
rawData.count()

rawData.map(parseJson(line)).count()

So for the first count action, it would take 2 secs for each file/task.
And for parsing, it would take another 2-4secs.

How the time could change so big, from 1min to 4-6 secs?

The problem is I do not have enough memory to cache everything. I am using
jackson json parser coming with the Spark.


Please share your advice  on this.

Thank you !


Re: How to increase the Json parsing speed

2015-08-28 Thread Gavin Yue
500 each with 8GB memory.

I did the test again on the cluster.

I have 6000 files which generates 6000 tasks.  Each task takes 1.5 min to
finish based on the Stats.

So theoretically it should take 15 mins roughly. WIth some additinal
overhead, it totally takes 18 mins.

Based on the local file parsing test, seems simply parsing the json is
fast, which only takes 7 secs.

So wonder where is the additional 1 min coming from.

Thanks again.


On Thu, Aug 27, 2015 at 11:44 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 How many executors are you using when using Spark SQL?

 On Fri, Aug 28, 2015 at 12:12 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 I see that you are not reusing the same mapper instance in the Scala
 snippet.

 Regards
 Sab

 On Fri, Aug 28, 2015 at 9:38 AM, Gavin Yue yue.yuany...@gmail.com
 wrote:

 Just did some tests.

 I have 6000 files, each has 14K records with 900Mb file size.  In spark
 sql, it would take one task roughly 1 min to parse.

 On the local machine, using the same Jackson lib inside Spark lib. Just
 parse it.

 FileInputStream fstream = new FileInputStream(testfile);
 BufferedReader br = new BufferedReader(new
 InputStreamReader(fstream));
 String strLine;
 Long begin = System.currentTimeMillis();
  while ((strLine = br.readLine()) != null)   {
 JsonNode s = mapper.readTree(strLine);
  }
 System.out.println(System.currentTimeMillis() - begin);

 In JDK8, it took *6270ms. *

 Same code in Scala, it would take *7486ms*
val begin =  java.lang.System.currentTimeMillis()
 for(line - Source.fromFile(testfile).getLines())
 {
   val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
   val s = mapper.readTree(line)
 }
 println(java.lang.System.currentTimeMillis() - begin)


 One Json record contains two fileds :  ID and List[Event].

 I am guessing put all the events into List would take the left time.

 Any solution to speed this up?

 Thanks a lot!


 On Thu, Aug 27, 2015 at 7:45 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 For your jsons, can you tell us what is your benchmark when running on
 a single machine using just plain Java (without Spark and Spark sql)?

 Regards
 Sab
 On 28-Aug-2015 7:29 am, Gavin Yue yue.yuany...@gmail.com wrote:

 Hey

 I am using the Json4s-Jackson parser coming with spark and parsing
 roughly 80m records with totally size 900mb.

 But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem)
 roughly 30mins to parse Json to use spark sql.

 Jackson has the benchmark saying parsing should be ms level.

 Any way to increase speed?

 I am using spark 1.4 on Hadoop 2.7 with Java 8.

 Thanks a lot !
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: How to increase the Json parsing speed

2015-08-27 Thread Gavin Yue
Just did some tests.

I have 6000 files, each has 14K records with 900Mb file size.  In spark
sql, it would take one task roughly 1 min to parse.

On the local machine, using the same Jackson lib inside Spark lib. Just
parse it.

FileInputStream fstream = new FileInputStream(testfile);
BufferedReader br = new BufferedReader(new
InputStreamReader(fstream));
String strLine;
Long begin = System.currentTimeMillis();
 while ((strLine = br.readLine()) != null)   {
JsonNode s = mapper.readTree(strLine);
 }
System.out.println(System.currentTimeMillis() - begin);

In JDK8, it took *6270ms. *

Same code in Scala, it would take *7486ms*
   val begin =  java.lang.System.currentTimeMillis()
for(line - Source.fromFile(testfile).getLines())
{
  val mapper = new ObjectMapper()
  mapper.registerModule(DefaultScalaModule)
  val s = mapper.readTree(line)
}
println(java.lang.System.currentTimeMillis() - begin)


One Json record contains two fileds :  ID and List[Event].

I am guessing put all the events into List would take the left time.

Any solution to speed this up?

Thanks a lot!


On Thu, Aug 27, 2015 at 7:45 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 For your jsons, can you tell us what is your benchmark when running on a
 single machine using just plain Java (without Spark and Spark sql)?

 Regards
 Sab
 On 28-Aug-2015 7:29 am, Gavin Yue yue.yuany...@gmail.com wrote:

 Hey

 I am using the Json4s-Jackson parser coming with spark and parsing
 roughly 80m records with totally size 900mb.

 But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem)
 roughly 30mins to parse Json to use spark sql.

 Jackson has the benchmark saying parsing should be ms level.

 Any way to increase speed?

 I am using spark 1.4 on Hadoop 2.7 with Java 8.

 Thanks a lot !
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




How to increase the Json parsing speed

2015-08-27 Thread Gavin Yue
Hey 

I am using the Json4s-Jackson parser coming with spark and parsing roughly 80m 
records with totally size 900mb. 

But the speed is slow.  It took my 50 nodes(16cores cpu,100gb mem) roughly 
30mins to parse Json to use spark sql. 

Jackson has the benchmark saying parsing should be ms level. 

Any way to increase speed? 

I am using spark 1.4 on Hadoop 2.7 with Java 8. 

Thanks a lot ! 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Abount Jobs UI in yarn-client mode

2015-06-20 Thread Gavin Yue
I got the same problem when I upgrade from 1.3.1 to 1.4.

The same Conf has been used, 1.3 works, but 1.4UI does not work.

So I added the
property
nameyarn.resourcemanager.webapp.address/name
value:8088/value
  /property
  property
nameyarn.resourcemanager.hostname/name
value/value
  /property

To yarn-site.xml.  The problem solved.

Spark 1.4 + Yarn 2.7 + Java 8

On Fri, Jun 19, 2015 at 8:48 AM, Sea 261810...@qq.com wrote:

 Hi, all:
 I run spark on yarn,  I want to see the Jobs UI http://ip:4040/,
 but it redirect to http://
 ${yarn.ip}/proxy/application_1428110196022_924324/ which can not be
 found. Why?
 Anyone can help?



How could output the StreamingLinearRegressionWithSGD prediction result?

2015-06-20 Thread Gavin Yue
Hey,

I am testing the StreamingLinearRegressionWithSGD following the tutorial.


It works, but I could not output the prediction results. I tried the
saveAsTextFile, but it only output _SUCCESS to the folder.


I am trying to check the prediction results and use
BinaryClassificationMetrics to get areaUnderROC.


Any example for this?

Thank you !


Re: What is most efficient to do a large union and remove duplicates?

2015-06-14 Thread Gavin Yue
Each folder should have no dups. Dups only exist among different folders.  

The logic inside is that only take the longest string value for each key. 

The current problem is exceeding the largest frame size when trying to write to 
hdfs, which is 500m which setting is 80m. 

Sent from my iPhone

 On Jun 14, 2015, at 02:10, ayan guha guha.a...@gmail.com wrote:
 
 Can you do dedupe process locally for each file first and then globally? 
 Also I did not fully get the logic of the part inside reducebykey. Can you 
 kindly explain?
 
 On 14 Jun 2015 13:58, Gavin Yue yue.yuany...@gmail.com wrote:
 I have 10 folder, each with 6000 files. Each folder is roughly 500GB.  So 
 totally 5TB data. 
 
 The data is  formatted as  key t/ value.  After union,  I want to remove the 
 duplicates among keys. So each key should be unique and  has only one value. 
 
 Here is what I am doing. 
 
 folders = Array(folder1,folder2folder10 ) 
 
 var rawData = sc.textFile(folders(0)).map(x = (x.split(\t)(0), 
 x.split(\t)(1))) 
 
 for (a - 1 to sud_paths.length - 1) { 
   rawData = rawData.union(sc.textFile(folders (a)).map(x = 
 (x.split(\t)(0), x.split(\t)(1 
 } 
 
 val nodups = rawData.reduceByKey((a,b)= 
 { 
   if(a.length  b.length) 
   {a} 
   else 
   {b} 
   } 
 ) 
 nodups.saveAsTextFile(/nodups) 
 
 Anything I could do to make this process faster?   Right now my process dies 
 when output the data to the HDFS. 
 
 
 Thank you !


What is most efficient to do a large union and remove duplicates?

2015-06-13 Thread Gavin Yue
I have 10 folder, each with 6000 files. Each folder is roughly 500GB.  So
totally 5TB data.

The data is  formatted as  key t/ value.  After union,  I want to remove
the duplicates among keys. So each key should be unique and  has only one
value.

Here is what I am doing.

folders = Array(folder1,folder2folder10 )

var rawData = sc.textFile(folders(0)).map(x = (x.split(\t)(0),
x.split(\t)(1)))

for (a - 1 to sud_paths.length - 1) {
  rawData = rawData.union(sc.textFile(folders (a)).map(x =
(x.split(\t)(0), x.split(\t)(1
}

val nodups = rawData.reduceByKey((a,b)=
{
  if(a.length  b.length)
  {a}
  else
  {b}
  }
)
nodups.saveAsTextFile(/nodups)

Anything I could do to make this process faster?   Right now my process
dies when output the data to the HDFS.


Thank you !


What is most efficient to do a large union and remove duplicates?

2015-06-13 Thread Gavin Yue
I have 10 folder, each with 6000 files. Each folder is roughly 500GB.  So
totally 5TB data. 

The data is  formatted as  key t/ value.  After union,  I want to remove the
duplicates among keys. So each key should be unique and  has only one value. 

Here is what I am doing. 

folders = Array(folder1,folder2folder10 )

var rawData = sc.textFile(folders(0)).map(x = (x.split(\t)(0),
x.split(\t)(1)))

for (a - 1 to sud_paths.length - 1) {
  rawData = rawData.union(sc.textFile(folders (a)).map(x =
(x.split(\t)(0), x.split(\t)(1
}

val nodups = rawData.reduceByKey((a,b)=
{
  if(a.length  b.length)
  {a}
  else
  {b}
  }
)
nodups.saveAsTextFile(/nodups)

Anything I could do to make this process faster?   Right now my process dies
when output the data to the HDFS. 


Thank you !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-most-efficient-to-do-a-large-union-and-remove-duplicates-tp23303.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org