[ 
https://issues.apache.org/jira/browse/HUDI-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142505#comment-17142505
 ] 

Vinoth Chandar commented on HUDI-1013:
--------------------------------------

Actually, the numbers above were based on an intermediate implementation, which 
was inadvertently doing a RDD conversion..

Below are the numbers with this fixed. 

 
I went on to send the Dataset<InterimWriteStatus> all the way through commit 
and post Commit as well so as to avoid the extra toRdd conversion. 
 
 
|Laptop : jmh| | |
|1 M records, 100 parallelism| | |
|3 iterations|Avg time in secs|Notes|
|BulkInsert|224.808|Existing code path|
|BulkInsertRowsHudiDirectWrite|71.807|Direct write to parquet with 
mapPartitionsFunc
and ParquetWriteSupport|
|benchmarkBulkInsertRows|55.175|New code path for bulk insert using 
Dataset<Row>|
|benchmarkDirectParquetWrites|60.043|Datasource parquet writes|
| | | |
| | | |
|Cluster (1 to 2 rounds)| | |
|5M records, 100 parallelism| | |
|BulkInsert|138 secs|Existing code path|
|BulkInsertRowsHudiDirectWrite|55 secs|Direct write to parquet with 
mapPartitionsFunc
and ParquetWriteSupport|
|benchmarkBulkInsertRows|64 secs|New code path for bulk insert using 
Dataset<Row>|
|benchmarkDirectParquetWrites|57 secs|Datasource parquet writes|
| | | |
|Cluster| | |
|10M records, 100 parallelism| | |
|BulkInsert|206 secs|Existing code path|
|BulkInsertRowsHudiDirectWrite|110 secs|Direct write to parquet with 
mapPartitionsFunc
and ParquetWriteSupport|
|benchmarkBulkInsertRows|114 secs|New code path for bulk insert using 
Dataset<Row>|
|benchmarkDirectParquetWrites|96 secs|Datasource parquet writes|
 
 

> Bulk Insert w/o converting to RDD
> ---------------------------------
>
>                 Key: HUDI-1013
>                 URL: https://issues.apache.org/jira/browse/HUDI-1013
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 0.6.0
>
>
> Our bulk insert(not just bulk insert, all operations infact) does dataset to 
> rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with 
> JavaRDD<HoodieRecord>s. We are trying to see if we can improve our 
> performance by avoiding the rdd conversion.  We will first start off w/ bulk 
> insert and get end to end working before we decide if we wanna do this for 
> other operations too after doing some perf analysis. 
>  
> On a high level, this is the idea
> 1. Dataset<Row> will be passed in all the way from spark sql writer to the 
> storage writer. We do not convert to HoodieRecord at any point in time. 
> 2. We need to use 
> [ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]]
>  to write to Parquet as InternalRows.
> 3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by 
> partition path and record keys, repartition by parallelism config, and do 
> mapPartitions. Within MapPartitions, we will iterate through the Rows, encode 
> to InternalRows and write to Parquet using the write support linked above. 
> We first wanted to check if our strategy will actually improve the perf. So, 
> I did a quick hack of just the mapPartition func in HoodieSparkSqlWriter just 
> to see how the numbers look like. Check for operation 
> "bulk_insert_direct_parquet_write_support" 
> [here|#diff-5317f4121df875e406876f9f0f012fac]]. 
> These are the numbers I got. (1) is existing hoodie bulk insert which does 
> the rdd conversion to JavaRdd<HoodieRecords>. (2) is writing directly to 
> parquet in spark. Code given below. (3) is the modified hoodie code i.e. 
> operation bulk_insert_direct_parquet_write_support)
>  
> | |5M records 100 parallelism input size 2.5 GB|
> |(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB|
> |(2) Parquet |62 secs. output size 2.5 GB|
> |(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB|
>  
> So, essentially our existing code for bulk insert is > 2x that of parquet. 
> Our modified hudi code (i.e. operation 
> bulk_insert_direct_parquet_write_support) is close to direct Parquet write in 
> spark, which shows that our strategy should work. 
> // This is the Parquet write in spark. (2) above. 
> transformedDF.sort(*"partition"*, *"key"*)
> .coalesce(parallelism)
>  .write.format(*"parquet"*)
>  .partitionBy(*"partition"*)
>  .mode(saveMode)
>  .save(*s"**$*outputPath*/**$*format*"*)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to