vontman opened a new issue #1498: Migrating parquet table to hudi issue 
[SUPPORT]
URL: https://github.com/apache/incubator-hudi/issues/1498
 
 
   
   
   **Describe the problem you faced**
   
   I have questions regarding the Hudi table initial loading (migrating from 
parquet to Hudi table, bulk-insert), because we have encountered significantly 
high loading time, but first let me add the details for both tables we were 
trying to load, spark conf, Hudi conf and further modifications.
   
   Sample of attempts:
   **Table1**: 6.7GB parquet, 180M records, 16 columns and key is composite of 
2 columns. Spark Conf: 1 executor, 12 cores, 16GB, 32 shuffle, 32 
bulk-insert-parallelism.
   **Table2**: 21GB parquet, 600M records, 16 columns and key is composite of 2 
columns. Spark Conf: 4 executor, 8 cores, 32GB, 128 shuffle, 128 
bulk-insert-parallelism.
   **Table 1 loading time**: 25 min.
   **Table 2 loading time**: 47 min.
   Both tables read and write from/to local file system.
   
   **To Reproduce**
   
   Code sample used:
   ```scala
   import cluster.SparkConf
   import common.DataConfig._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.SaveMode._
   import org.apache.spark.sql.SparkSession
   object HudiFilewriter {
   
     val COW = "COW"
     val MOR = "MOR"
   
     def main(args: Array[String]): Unit = {
       val tableName = args(0)
       val basePath = args(1)
       val tableType = if(COW.equalsIgnoreCase(args(2))) COW_TABLE_TYPE_OPT_VAL 
else MOR_TABLE_TYPE_OPT_VAL
       val rawTablePath = args(3)
       val partitionCol = args(4)
   
       val spark = SparkSession.builder()
         .getOrCreate()
   
       val logLevel = spark.sparkContext.getConf.get(SparkConf.LOG_LEVEL)
       spark.sparkContext.setLogLevel(logLevel)
   
       val shuffle = 
spark.sparkContext.getConf.get(SparkConf.SHUFFLE_PARTITIONS)
   
   
       var hudiOptions = Map[String, String](
   
         //HoodieWriteConfig
         TABLE_NAME -> tableName,
         "hoodie.bulkinsert.shuffle.parallelism" -> shuffle,
   
         //DataSourceWriteOptions
         TABLE_TYPE_OPT_KEY -> tableType,
         PRECOMBINE_FIELD_OPT_KEY -> UPDATE_COL,
         KEYGENERATOR_CLASS_OPT_KEY -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
         RECORDKEY_FIELD_OPT_KEY -> KEY_COLS.mkString(","),
         PARTITIONPATH_FIELD_OPT_KEY -> partitionCol,
         OPERATION_OPT_KEY -> BULK_INSERT_OPERATION_OPT_VAL
   
       )
   
       spark.time{
         val df = spark.read.parquet(rawTablePath)
   
         df.write.format("org.apache.hudi").
           options(hudiOptions).
           mode(Overwrite).
           save(basePath)
       }
     }
   ```
   
   
   **Expected behavior**
   
   Similar performance to vanilla parquet writing with additional sort overhead.
   
   **Environment Description**
   
   * Hudi version : 0.5.2
   
   * Spark version : 2.4.5
   
   * Hive version : NA
   
   * Hadoop version : NA
   
   * Storage (HDFS/S3/GCS..) : Local file System
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   **Attempts**:
   - We tried multiple different spark configurations, increasing the shuffle 
and bulk-insert parallelism, increasing the number of executors while 
maintaining the base resources, increasing memory threshold of the 
driver/executors.
   
   - Hudi Tables types (MOR partitioned and non-partitioned) (COW partitioned 
and non-petitioned), for partitioned tables we provided a partitioned version 
of the base table a long with the partitioned column(s).
   - Hudi and spark version: "hudi-spark-bundle" % "0.5.1-incubating", 
Spark-2.4.3, "spark-avro" % "2.4.3".
   - Upgraded Hudi and spark version: "hudi-spark-bundle" % "0.5.2-incubating", 
Spark-2.4.5, "spark-avro" % "2.4.5".
   
   - Base data preparation, sorted by keys or partitioned.
   
   - Load the data partition by partition, filter base table based on the 
partition column and bulk-insert each dataframe result, so each partition 
individually will use the whole app resources during the writing operation, use 
new app for each partition.
   
   All the above attempts didn't improve the loading time that much or make it 
worse.
   So I would like to know if:
   - Is that the normal time for initial loading for Hudi tables, or we are 
doing something wrong?
   - Do we need a better cluster/recoures to be able to load the data for the 
first time?, because it is mentioned on Hudi confluence page that COW 
bulkinsert should match vanilla parquet writing + sort only.
   - Does partitioning improves the upsert and/or compaction time for Hudi 
tables, or just to improve the analytical queries (partition pruning)?
   - We have noticed that the most time spent in the data indexing (the 
bulk-insert logic itself) and not the sorting stages/operation before the 
indexing, so how can we improve that? should we provide our own indexing logic?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to