Hi Lamber-ken, If you agree reduceByKey() will shuffle data, then it would serialize and deserialize anyway correct?
I am not denying that this may be a valid approach.. But we need much more rigorous testing and potentially implement both approaches side-by-side to compare.. IMO We cannot conclude based on this on the one test we had - where the metadata overhead was so high . First step would be to introduce abstractions so that these two ways can be implemented side-by-side and controlled by a flag.. Also let's separate the RDD vs DataFrame discussion out of this? Since that orthogonal anyway.. Thanks Vinoth On Fri, Feb 28, 2020 at 11:02 AM lamberken <[email protected]> wrote: > > > Hi vinoth, > > > Thanks for reviewing the initial design :) > I know there are many problems at present(e.g shuffling, parallelism > issue). We can discussed the practicability of the idea first. > > > > ExternalSpillableMap itself was not the issue right, the serialization > was > Right, the new design will not have this issue, because will not use it at > all. > > > > This map is also used on the query side > Right, the proposal aims to improve the merge performance of cow table. > > > > HoodieWriteClient.java#L546 We cannot collect() the recordRDD at all ... > OOM driver > Here, in order to get the Map<fileId, partition>, had executed distinct() > before collect(), the result is very small. > Also, it can be implemented in FileSystemViewManager, and lazy loading > also ok. > > > > Doesn't this move the problem to tuning spark simply? > there are two serious performance problems in the old merge logic. > 1, when upsert many records, it will serialize record to disk, then > deserialize it when merge old record > 2, only single thread comsume the old record one by one, then handle the > merge process, it is much less efficient. > > > > doing a sort based merge repartitionAndSortWithinPartitions > Trying to understand your point :) > > > Compare to old version, may there are serveral improvements > 1. use spark built-in operators, it's easier to understand. > 2. during my testing, the upsert performance doubled. > 3. if possible, we can write data in batch by using Dataframe in the > futher. > > > [1] > https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java > > > Best, > Lamber-Ken > > > > > > > > > > At 2020-02-29 01:40:36, "Vinoth Chandar" <[email protected]> wrote: > >Does n't this move the problem to tuning spark simply? the > >ExternalSpillableMap itself was not the issue right, the serialization > >was. This map is also used on the query side btw, where we need something > >like that. > > > >I took a pass at the code. I think we are shuffling data again for the > >reduceByKey step in this approach? For MOR, note that this is unnecessary > >since we simply log the. records and there is no merge. This approach > might > >have a better parallelism of merging when that's costly.. But ultimately, > >our write parallelism is limited by number of affected files right? So > its > >not clear to me, that this would be a win always.. > > > >On the code itself, > > > https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L546 > > We cannot collect() the recordRDD at all.. It will OOM the driver .. :) > > > >Orthogonally, one thing we think of is : doing a sort based merge.. i.e > >repartitionAndSortWithinPartitions() the input records to mergehandle, > and > >if the file is also sorted on disk (its not today), then we can do a > >merge_sort like algorithm to perform the merge.. We can probably write > code > >to bear one time sorting costs... This will eliminate the need for memory > >for merging altogether.. > > > >On Wed, Feb 26, 2020 at 10:11 PM lamberken <[email protected]> wrote: > > > >> > >> > >> hi, vinoth > >> > >> > >> > What do you mean by spark built in operators > >> We may can not depency on ExternalSpillableMap again when upsert to cow > >> table. > >> > >> > >> > Are you suggesting that we perform the merging in sql > >> No, just only use spark built-in operators like mapToPair, reduceByKey > etc > >> > >> > >> Details has been described in this article[1], also finished draft > >> implementation and test. > >> mainly modified HoodieWriteClient#upsertRecordsInternal method. > >> > >> > >> [1] > >> > https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing > >> [2] > >> > https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java > >> > >> > >> > >> At 2020-02-27 13:45:57, "Vinoth Chandar" <[email protected]> wrote: > >> >Hi lamber-ken, > >> > > >> >Thanks for this. I am not quite following the proposal. What do you > mean > >> by > >> >spark built in operators? Dont we use the RDD based spark operations. > >> > > >> >Are you suggesting that we perform the merging in sql? Not following. > >> >Please clarify. > >> > > >> >On Wed, Feb 26, 2020 at 10:08 AM lamberken <[email protected]> wrote: > >> > > >> >> > >> >> > >> >> Hi guys, > >> >> > >> >> > >> >> Motivation > >> >> Impove the merge performance for cow table when upsert, handle merge > >> >> operation by using spark built-in operators. > >> >> > >> >> > >> >> Background > >> >> When do a upsert operation, for each bucket, hudi needs to put new > input > >> >> elements to memory cache map, and will > >> >> need an external map that spills content to disk when there is > >> >> insufficient space for it to grow. > >> >> > >> >> > >> >> There are several performance issuses: > >> >> 1. We may need an external disk map, serialize / deserialize records > >> >> 2. Only single thread do the I/O operation when check > >> >> 3. Can't take advantage of built-in spark operators > >> >> > >> >> > >> >> Based on above, reworked the merge logic and done draft test. > >> >> If you are also interested in this, please go ahead with this doc[1], > >> any > >> >> suggestion are welcome. :) > >> >> > >> >> > >> >> > >> >> > >> >> Thanks, > >> >> Lamber-Ken > >> >> > >> >> > >> >> [1] > >> >> > >> > https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing > >> >> > >> >> > >> >
