[
https://issues.apache.org/jira/browse/HUDI-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170477#comment-17170477
]
Vinoth Chandar commented on HUDI-1013:
--------------------------------------
[~shivnarayan] do we also enhance this to respect the sort modes, ethan
introduced?
> Bulk Insert w/o converting to RDD
> ---------------------------------
>
> Key: HUDI-1013
> URL: https://issues.apache.org/jira/browse/HUDI-1013
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Writer Core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.6.0
>
>
> Our bulk insert(not just bulk insert, all operations infact) does dataset to
> rdd conversion in HoodieSparkSqlWriter and our HoodieClient deals with
> JavaRDD<HoodieRecord>s. We are trying to see if we can improve our
> performance by avoiding the rdd conversion. We will first start off w/ bulk
> insert and get end to end working before we decide if we wanna do this for
> other operations too after doing some perf analysis.
>
> On a high level, this is the idea
> 1. Dataset<Row> will be passed in all the way from spark sql writer to the
> storage writer. We do not convert to HoodieRecord at any point in time.
> 2. We need to use
> [ParquetWriteSupport|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala]]
> to write to Parquet as InternalRows.
> 3. So, gist of what we wanna do is, with the Dataset<Rows>s, sort by
> partition path and record keys, repartition by parallelism config, and do
> mapPartitions. Within MapPartitions, we will iterate through the Rows, encode
> to InternalRows and write to Parquet using the write support linked above.
> We first wanted to check if our strategy will actually improve the perf. So,
> I did a quick hack of just the mapPartition func in HoodieSparkSqlWriter just
> to see how the numbers look like. Check for operation
> "bulk_insert_direct_parquet_write_support"
> [here|#diff-5317f4121df875e406876f9f0f012fac]].
> These are the numbers I got. (1) is existing hoodie bulk insert which does
> the rdd conversion to JavaRdd<HoodieRecords>. (2) is writing directly to
> parquet in spark. Code given below. (3) is the modified hoodie code i.e.
> operation bulk_insert_direct_parquet_write_support)
>
> | |5M records 100 parallelism input size 2.5 GB|
> |(1) Orig hoodie(unmodified)|169 secs. output size 2.7 GB|
> |(2) Parquet |62 secs. output size 2.5 GB|
> |(3) Modified hudi code. Direct Parquet Write |73 secs. output size 2.5 GB|
>
> So, essentially our existing code for bulk insert is > 2x that of parquet.
> Our modified hudi code (i.e. operation
> bulk_insert_direct_parquet_write_support) is close to direct Parquet write in
> spark, which shows that our strategy should work.
> // This is the Parquet write in spark. (2) above.
> transformedDF.sort(*"partition"*, *"key"*)
> .coalesce(parallelism)
> .write.format(*"parquet"*)
> .partitionBy(*"partition"*)
> .mode(saveMode)
> .save(*s"**$*outputPath*/**$*format*"*)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)