vinothchandar commented on issue #1328: Hudi upsert hangs URL: https://github.com/apache/incubator-hudi/issues/1328#issuecomment-588341138 I ported your code to scala and looking into the issue now.. Will keep you posted. ``` val HUDI_FORMAT = "org.apache.hudi" val TABLE_NAME = "hoodie.table.name" val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field" val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field" val OPERATION_OPT_KEY = "hoodie.datasource.write.operation" val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" val UPSERT_OPERATION_OPT_VAL = "upsert" val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism" val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism" val config = Map( "table_name" -> "example_table", "target" -> "file:///tmp/example_table/", "primary_key" -> "id", "sort_key" -> "id" ) val readPath = config("target") + "/*" val json_data = (1 to 4000000).map(i => "{\"id\":" + i + "}") val jsonRDD = spark.sparkContext.parallelize(json_data, 2) val df1 = spark.read.json(jsonRDD) println(s"${df1.count()} records in source 1") df1.printSchema // Runs quick df1.write.format(HUDI_FORMAT) .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")) .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")) .option(TABLE_NAME, config("table_name")) .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL) .option(BULK_INSERT_PARALLELISM, 1) .mode("Overwrite") .save(config("target")) println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table") // Runs quick df1.limit(3000000).write.format(HUDI_FORMAT) .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")) .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")) .option(TABLE_NAME, config("table_name")) .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) .option(UPSERT_PARALLELISM, 20) .mode("Append") .save(config("target")) println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table") // Runs very slow df1.write.format(HUDI_FORMAT) .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")) .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")) .option(TABLE_NAME, config("table_name")) .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) .option(UPSERT_PARALLELISM, 20) .mode("Append") .save(config("target")) println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table") ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
