vontman opened a new issue #1498: Migrating parquet table to hudi issue [SUPPORT] URL: https://github.com/apache/incubator-hudi/issues/1498 **Describe the problem you faced** I have questions regarding the Hudi table initial loading (migrating from parquet to Hudi table, bulk-insert), because we have encountered significantly high loading time, but first let me add the details for both tables we were trying to load, spark conf, Hudi conf and further modifications. Sample of attempts: **Table1**: 6.7GB parquet, 180M records, 16 columns and key is composite of 2 columns. Spark Conf: 1 executor, 12 cores, 16GB, 32 shuffle, 32 bulk-insert-parallelism. **Table2**: 21GB parquet, 600M records, 16 columns and key is composite of 2 columns. Spark Conf: 4 executor, 8 cores, 32GB, 128 shuffle, 128 bulk-insert-parallelism. **Table 1 loading time**: 25 min. **Table 2 loading time**: 47 min. Both tables read and write from/to local file system. **To Reproduce** Code sample used: ```scala import cluster.SparkConf import common.DataConfig._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SparkSession object HudiFilewriter { val COW = "COW" val MOR = "MOR" def main(args: Array[String]): Unit = { val tableName = args(0) val basePath = args(1) val tableType = if(COW.equalsIgnoreCase(args(2))) COW_TABLE_TYPE_OPT_VAL else MOR_TABLE_TYPE_OPT_VAL val rawTablePath = args(3) val partitionCol = args(4) val spark = SparkSession.builder() .getOrCreate() val logLevel = spark.sparkContext.getConf.get(SparkConf.LOG_LEVEL) spark.sparkContext.setLogLevel(logLevel) val shuffle = spark.sparkContext.getConf.get(SparkConf.SHUFFLE_PARTITIONS) var hudiOptions = Map[String, String]( //HoodieWriteConfig TABLE_NAME -> tableName, "hoodie.bulkinsert.shuffle.parallelism" -> shuffle, //DataSourceWriteOptions TABLE_TYPE_OPT_KEY -> tableType, PRECOMBINE_FIELD_OPT_KEY -> UPDATE_COL, KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.ComplexKeyGenerator", RECORDKEY_FIELD_OPT_KEY -> KEY_COLS.mkString(","), PARTITIONPATH_FIELD_OPT_KEY -> partitionCol, OPERATION_OPT_KEY -> BULK_INSERT_OPERATION_OPT_VAL ) spark.time{ val df = spark.read.parquet(rawTablePath) df.write.format("org.apache.hudi"). options(hudiOptions). mode(Overwrite). save(basePath) } } ``` **Expected behavior** Similar performance to vanilla parquet writing with additional sort overhead. **Environment Description** * Hudi version : 0.5.2 * Spark version : 2.4.5 * Hive version : NA * Hadoop version : NA * Storage (HDFS/S3/GCS..) : Local file System * Running on Docker? (yes/no) : no **Additional context** **Attempts**: - We tried multiple different spark configurations, increasing the shuffle and bulk-insert parallelism, increasing the number of executors while maintaining the base resources, increasing memory threshold of the driver/executors. - Hudi Tables types (MOR partitioned and non-partitioned) (COW partitioned and non-petitioned), for partitioned tables we provided a partitioned version of the base table a long with the partitioned column(s). - Hudi and spark version: "hudi-spark-bundle" % "0.5.1-incubating", Spark-2.4.3, "spark-avro" % "2.4.3". - Upgraded Hudi and spark version: "hudi-spark-bundle" % "0.5.2-incubating", Spark-2.4.5, "spark-avro" % "2.4.5". - Base data preparation, sorted by keys or partitioned. - Load the data partition by partition, filter base table based on the partition column and bulk-insert each dataframe result, so each partition individually will use the whole app resources during the writing operation, use new app for each partition. All the above attempts didn't improve the loading time that much or make it worse. So I would like to know if: - Is that the normal time for initial loading for Hudi tables, or we are doing something wrong? - Do we need a better cluster/recoures to be able to load the data for the first time?, because it is mentioned on Hudi confluence page that COW bulkinsert should match vanilla parquet writing + sort only. - Does partitioning improves the upsert and/or compaction time for Hudi tables, or just to improve the analytical queries (partition pruning)? - We have noticed that the most time spent in the data indexing (the bulk-insert logic itself) and not the sorting stages/operation before the indexing, so how can we improve that? should we provide our own indexing logic?
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
