[
https://issues.apache.org/jira/browse/HUDI-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008189#comment-17008189
]
lamber-ken edited comment on HUDI-494 at 1/4/20 11:33 PM:
----------------------------------------------------------
hi [~garyli1019], I guess your dataset contains many partitions, you can check
this option *PARTITIONPATH_FIELD_OPT_KEY*.
hi, [~vinoth] WDYT?
*Reprodut steps*
1, steup hudi
{code:java}
${SPARK_HOME}/bin/spark-shell --packages
org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name":
"kenken", "ts": 1574297893836, "age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", tableName).
mode(Overwrite).
save(basePath)
var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz",
"age": 123, "location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", "hudi_mor_table").
mode(Append).
save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui
!image-2020-01-05-07-30-53-567.png!
was (Author: lamber-ken):
hi [~garyli1019], I guess your dataset contains many partitions, you can check
this option *PARTITIONPATH_FIELD_OPT_KEY*.
hi, [~vinoth] WDYT?
*Reprodut steps*
1, steup hudi
{code:java}
${SPARK_HOME}/bin/spark-shell --packages
org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name":
"kenken", "ts": 1574297893836, "age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", tableName).
mode(Overwrite).
save(basePath)
var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz",
"age": 123, "location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", "hudi_mor_table").
mode(Append).
save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui
!image-2020-01-05-07-30-53-567.png!
> [DEBUGGING] Huge amount of tasks when writing files into HDFS
> -------------------------------------------------------------
>
> Key: HUDI-494
> URL: https://issues.apache.org/jira/browse/HUDI-494
> Project: Apache Hudi (incubating)
> Issue Type: Test
> Reporter: Yanjia Gary Li
> Assignee: Vinoth Chandar
> Priority: Major
> Attachments: Screen Shot 2020-01-02 at 8.53.24 PM.png, Screen Shot
> 2020-01-02 at 8.53.44 PM.png, image-2020-01-05-07-30-53-567.png
>
>
> I am using the manual build master after
> [https://github.com/apache/incubator-hudi/commit/36b3b6f5dd913d3f1c9aa116aff8daf6540fed65]
> commit. EDIT: tried with the latest master but got the same result
> I am seeing 3 million tasks when the Hudi Spark job writing the files into
> HDFS. It seems like related to the input size. With 7.7 GB input it was 3.2
> million tasks, with 9 GB input it was 3.7 million. Both with 10 parallelisms.
> I am seeing a huge amount of 0 byte files being written into .hoodie/.temp/
> folder in my HDFS. In the Spark UI, each task only writes less than 10
> records in
> {code:java}
> count at HoodieSparkSqlWriter{code}
> All the stages before this seem normal. Any idea what happened here? My
> first guess would be something related to the bloom filter index. Maybe
> somewhere trigger the repartitioning with the bloom filter index? But I am
> not really familiar with that part of the code.
> Thanks
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)