See the first half of this wiki: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
> On Jan 9, 2016, at 1:02 AM, Gavin Yue <yue.yuany...@gmail.com> wrote: > > So I tried to set the parquet compression codec to lzo, but hadoop does not > have the lzo natives, while lz4 does included. > But I could set the code to lz4, it only accepts lzo. > > Any solution here? > > Thank, > Gavin > > > >> On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yue.yuany...@gmail.com> wrote: >> I saw in the document, the value is LZO. Is it LZO or LZ4? >> >> https://github.com/Cyan4973/lz4 >> >> Based on this benchmark, they differ quite a lot. >> >> >> >>> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> gzip is relatively slow. It consumes much CPU. >>> >>> snappy is faster. >>> >>> LZ4 is faster than GZIP and smaller than Snappy. >>> >>> Cheers >>> >>>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: >>>> Thank you . >>>> >>>> And speaking of compression, is there big difference on performance >>>> between gzip and snappy? And why parquet is using gzip by default? >>>> >>>> Thanks. >>>> >>>> >>>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> Cycling old bits: >>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ >>>>> >>>>> Gavin: >>>>> Which release of hbase did you play with ? >>>>> >>>>> HBase has been evolving and is getting more stable. >>>>> >>>>> Cheers >>>>> >>>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: >>>>>> I used to maintain a HBase cluster. The experience with it was not >>>>>> happy. >>>>>> >>>>>> I just tried query the data from each day's first and dedup with >>>>>> smaller set, the performance is acceptable. So I guess I will use this >>>>>> method. >>>>>> >>>>>> Again, could anyone give advice about: >>>>>> Automatically determine the number of reducers for joins and groupbys: >>>>>> Currently in Spark SQL, you need to control the degree of parallelism >>>>>> post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”. >>>>>> Thanks. >>>>>> >>>>>> Gavin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> bq. in an noSQL db such as Hbase >>>>>>> >>>>>>> +1 :-) >>>>>>> >>>>>>> >>>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: >>>>>>>> One option you may want to explore is writing event table in an noSQL >>>>>>>> db such as Hbase. One inherent problem in your approach is you always >>>>>>>> need to load either full data set or a defined number of partitions to >>>>>>>> see if the event has already come (and no gurantee it is full proof, >>>>>>>> but lead to unnecessary loading in most cases). >>>>>>>> >>>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> Hey, >>>>>>>>> Thank you for the answer. I checked the setting you mentioend they >>>>>>>>> are all correct. I noticed that in the job, there are always only >>>>>>>>> 200 reducers for shuffle read, I believe it is setting in the sql >>>>>>>>> shuffle parallism. >>>>>>>>> >>>>>>>>> In the doc, it mentions: >>>>>>>>> Automatically determine the number of reducers for joins and >>>>>>>>> groupbys: Currently in Spark SQL, you need to control the degree of >>>>>>>>> parallelism post-shuffle using “SET >>>>>>>>> spark.sql.shuffle.partitions=[num_tasks];”. >>>>>>>>> >>>>>>>>> >>>>>>>>> What would be the ideal number for this setting? Is it based on the >>>>>>>>> hardware of cluster? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Gavin >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I assume your parquet files are compressed. Gzip or Snappy? >>>>>>>>>> What spark version did you use? It seems at least 1.4. If you use >>>>>>>>>> spark-sql and tungsten, you might have better performance. but spark >>>>>>>>>> 1.5.2 gave me a wrong result when the data was about 300~400GB, just >>>>>>>>>> for a simple group-by and aggregate. >>>>>>>>>> Did you use kyro serialization? >>>>>>>>>> you should have spark.shuffle.compress=true, verify it. >>>>>>>>>> How many tasks did you use? spark.default.parallelism=? >>>>>>>>>> What about this: >>>>>>>>>> Read the data day by day >>>>>>>>>> compute a bucket id from timestamp, e.g., the date and hour >>>>>>>>>> Write into different buckets (you probably need a special writer to >>>>>>>>>> write data efficiently without shuffling the data). >>>>>>>>>> distinct for each bucket. Because each bucket is small, spark can >>>>>>>>>> get it done faster than having everything in one run. >>>>>>>>>> I think using groupBy (userId, timestamp) might be better than >>>>>>>>>> distinct. I guess distinct() will compare every field. >>>>>>>>>> >>>>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> And the most frequent operation I am gonna do is find the UserID >>>>>>>>>>> who have some events, then retrieve all the events associted with >>>>>>>>>>> the UserID. >>>>>>>>>>> >>>>>>>>>>> In this case, how should I partition to speed up the process? >>>>>>>>>>> >>>>>>>>>>> Thanks. >>>>>>>>>>> >>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> hey Ted, >>>>>>>>>>>> >>>>>>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp, >>>>>>>>>>>> MetaData. I just parse it from Json and save as Parquet, did not >>>>>>>>>>>> change the partition. >>>>>>>>>>>> >>>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates >>>>>>>>>>>> among each other. One same event could show up in Day1 and Day2 >>>>>>>>>>>> and probably Day3. >>>>>>>>>>>> >>>>>>>>>>>> I only want to keep single Event table and each day it come so >>>>>>>>>>>> many duplicates. >>>>>>>>>>>> >>>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate >>>>>>>>>>>> found, just ignore? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Gavin >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> Is your Parquet data source partitioned by date ? >>>>>>>>>>>>> >>>>>>>>>>>>> Can you dedup within partitions ? >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers >>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue >>>>>>>>>>>>>> <yue.yuany...@gmail.com> wrote: >>>>>>>>>>>>>> I tried on Three day's data. The total input is only 980GB, but >>>>>>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed >>>>>>>>>>>>>> during shuffle read step, which should be another 6.2TB shuffle >>>>>>>>>>>>>> read. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there >>>>>>>>>>>>>> anything I could do to stablize this process? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue >>>>>>>>>>>>>>> <yue.yuany...@gmail.com> wrote: >>>>>>>>>>>>>>> Hey, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a >>>>>>>>>>>>>>> single Event table. But there so many duplicates among each >>>>>>>>>>>>>>> day's data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I use Parquet as the data source. What I am doing now is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new >>>>>>>>>>>>>>> parquet file"). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to >>>>>>>>>>>>>>> one executor. I guess this is due to the memory issue. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Any suggestion how I do this efficiently? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Gavin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards, >>>>>>>> Ayan Guha >