Does n't this move the problem to tuning spark simply? the ExternalSpillableMap itself was not the issue right, the serialization was. This map is also used on the query side btw, where we need something like that.
I took a pass at the code. I think we are shuffling data again for the reduceByKey step in this approach? For MOR, note that this is unnecessary since we simply log the. records and there is no merge. This approach might have a better parallelism of merging when that's costly.. But ultimately, our write parallelism is limited by number of affected files right? So its not clear to me, that this would be a win always.. On the code itself, https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L546 We cannot collect() the recordRDD at all.. It will OOM the driver .. :) Orthogonally, one thing we think of is : doing a sort based merge.. i.e repartitionAndSortWithinPartitions() the input records to mergehandle, and if the file is also sorted on disk (its not today), then we can do a merge_sort like algorithm to perform the merge.. We can probably write code to bear one time sorting costs... This will eliminate the need for memory for merging altogether.. On Wed, Feb 26, 2020 at 10:11 PM lamberken <[email protected]> wrote: > > > hi, vinoth > > > > What do you mean by spark built in operators > We may can not depency on ExternalSpillableMap again when upsert to cow > table. > > > > Are you suggesting that we perform the merging in sql > No, just only use spark built-in operators like mapToPair, reduceByKey etc > > > Details has been described in this article[1], also finished draft > implementation and test. > mainly modified HoodieWriteClient#upsertRecordsInternal method. > > > [1] > https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing > [2] > https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java > > > > At 2020-02-27 13:45:57, "Vinoth Chandar" <[email protected]> wrote: > >Hi lamber-ken, > > > >Thanks for this. I am not quite following the proposal. What do you mean > by > >spark built in operators? Dont we use the RDD based spark operations. > > > >Are you suggesting that we perform the merging in sql? Not following. > >Please clarify. > > > >On Wed, Feb 26, 2020 at 10:08 AM lamberken <[email protected]> wrote: > > > >> > >> > >> Hi guys, > >> > >> > >> Motivation > >> Impove the merge performance for cow table when upsert, handle merge > >> operation by using spark built-in operators. > >> > >> > >> Background > >> When do a upsert operation, for each bucket, hudi needs to put new input > >> elements to memory cache map, and will > >> need an external map that spills content to disk when there is > >> insufficient space for it to grow. > >> > >> > >> There are several performance issuses: > >> 1. We may need an external disk map, serialize / deserialize records > >> 2. Only single thread do the I/O operation when check > >> 3. Can't take advantage of built-in spark operators > >> > >> > >> Based on above, reworked the merge logic and done draft test. > >> If you are also interested in this, please go ahead with this doc[1], > any > >> suggestion are welcome. :) > >> > >> > >> > >> > >> Thanks, > >> Lamber-Ken > >> > >> > >> [1] > >> > https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing > >> > >> >
