asharma4-lucid opened a new issue #2269:
URL: https://github.com/apache/hudi/issues/2269


   I am trying to bulk insert 5 GB parquet s3 file in a partitioned HUDI table 
in s3. The input parquet s3 file is written by an upstream process where in the 
5 GB data is divided across 200 part files. The hudi table load process takes a 
long time to load this data. There is no movement in the driver logs and in 
some cases the log appears to be stuck for ~4 hours at one place bringing the 
total execution time to 8 hrs. Out of these 8 hrs, the spark UI application 
stages show that the processing stages take around ~3 to 4 hours approximately, 
but there is a huge period of time when there is no processing that is visible 
on the spark UI and the driver log is also stuck for ~4 hours at one place. 
When I try to insert in a non-partitioned HUDI table, the write to the hudi 
table happens in 6 minutes without any issues. Also, another observation is 
when I try to write to the hudi table in HDFS instead of s3, then the process 
completes in ~1hr. Can you please advise on as to what is happen
 ing here in terms of the driver log being stuck at one position and how can it 
be mitigated?
   
   Attached the log file where in you can see that there is no movement in the 
driver log.
   
[log_2020_08_24.log](https://github.com/apache/hudi/files/5578693/log_2020_08_24.log)
   
   Below is the log snippet where in the input 5 GB parquet file is being 
written. This file will be used in the second half of the process to write to 
the Hudi table.
   ```
   20/11/20 07:27:12 INFO DAGScheduler: Job 3 finished: save at 
HistLoader.scala:75, took 29.940154 s
   20/11/20 07:27:12 INFO MultipartUploadOutputStream: close closed:false 
s3://lucid-datalake-lt/temp_stage/_SUCCESS
   20/11/20 07:27:12 INFO FileFormatWriter: Write Job 
fb9c88a3-d50a-4451-b623-5d23981df40e committed.
   ```
   
   As advised, I am forcing a read of the above input file immediately after 
writing and it happens very quickly as can be seen from the logs.
   ```
   20/11/20 07:27:13 INFO DAGScheduler: ResultStage 6 (show at 
HistLoader.scala:79) finished in 0.339 s
   20/11/20 07:27:13 INFO DAGScheduler: Job 5 finished: show at 
HistLoader.scala:79, took 0.340348 s
   
+-----------+------------+-----------+------+--------------------+--------------------+
   |survey_dbid|session_dbid|question_id|answer|       respondent_id|           
timestamp|
   
+-----------+------------+-----------+------+--------------------+--------------------+
   |    8292560|  6120232969|         44|    IN|734e0eac-53fd-455...|2020-08-24 
18:32:...|
   |    8292560|  6120232969|      79400|    10|734e0eac-53fd-455...|2020-08-24 
18:32:...|
   ```
   
   Below is snippet from above attached log where it seems to be stuck for half 
an hour:
   ```
   20/11/20 07:31:45 INFO ContextCleaner: Cleaned accumulator 13
   20/11/20 07:31:45 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
ip-10-0-5-165.fulcrumex.com:39631 in memory (size: 33.9 KB, free: 57.8 GB)
   20/11/20 08:04:17 INFO BlockManagerInfo: Added rdd_27_0 in memory on 
ip-10-0-5-170.fulcrumex.com:39049 (size: 719.7 KB, free: 51.4 GB)
   20/11/20 08:04:17 INFO TaskSetManager: Starting task 10.0 in stage 8.0 (TID 
13447, ip-10-0-5-170.fulcrumex.com, executor 11, partition 10, RACK_LOCAL, 9867 
bytes)
   ```
   
   Below is snippet from above attached log where it seems to be stuck for ~4 
hrs 45 mins:
   ```
   20/11/20 09:28:11 INFO ContextCleaner: Cleaned accumulator 206
   20/11/20 09:28:11 INFO ContextCleaner: Cleaned accumulator 208
   20/11/20 14:16:46 INFO SparkContext: Starting job: collect at 
CleanActionExecutor.java:90
   20/11/20 14:16:46 INFO DAGScheduler: Got job 11 (collect at 
CleanActionExecutor.java:90) with 200 output partitions
   ```
   
   Following in the configuration that I am using:
   ```
   AWS EMR 
   - Master - r5.4x large
   - 10 Core node - r5.4x large
   - Release label:emr-5.31.0
   - Spark - 2.4.6
   - On acocunt of above the hudi version is 0.6.0 
(hudi-spark-bundle_2.11-0.6.0-amzn-0.jar)
   ```
   
   Spark Customizations: I changedthe below on account of containers getting 
killed by yarn during execution on account of less memory.
   ```
   "spark.executor.memory":"98971M",
   "spark.driver.memory":"98971M"
   "spark.executor.cores":"1" - I reduced the number of cores from the default 
4 to 1, assuming that each executor will have more memory at its disposal on 
account of the memory issues that I was seeing earlier.
   ```
   
   Hudi table Load Options:
   ```
   HoodieWriteConfig.TABLE_NAME -> "session_answers",
   DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
   DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "session_dbid,question_id",
   DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "survey_dbid",
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
   "hoodie.bulkinsert.shuffle.parallelism" -> "20", --- Initially I did not set 
it. But the process took even longer around 11 hrs with a lot of containers 
getting killed by yarn. So, I set to 20. 
   "hoodie.bulkinsert.sort.mode" -> "NONE",
   DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL
   ```
   
   Below is the code that I am running:
   ```
   package com.lucidhq.etl.reports
   
   import org.apache.spark.sql.{ DataFrame, SaveMode, SparkSession }
   
   import org.apache.spark.sql.functions.{
     col,
     udf,
     trim,
     split,
     size,
     max,
     lit,
     when,
     date_format,
     concat_ws,
     collect_list,
     first
   }
   
   import scala.collection.mutable.ListBuffer
   
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.{ DataSourceWriteOptions, DataSourceReadOptions }
   import org.apache.hudi.config.HoodieWriteConfig
   //import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import scala.reflect.io.Directory
   import java.io.File
   import org.apache.log4j.Logger
   
   object HistLoader {
   
     val sourceFilePrefix = "s3://lucid-kafka-archive/respondent.profiles.v3"
     val intermediateStagePath = "s3://lucid-datalake-lt/temp_stage/"
     val logger = Logger.getLogger(getClass.getName)
   
     var basePath = ""
     var processDate = ""
   
     def main(arg: Array[String]): Unit = {
   
       val spark = SparkSession.builder.
         appName("Sessions_hist_load").
         config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer").
         config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 
"2").
         getOrCreate()
   
       
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version",
 "2")
   
       processDate = arg(0)
       basePath = arg(1)
       logger.info(s"""
         processDate: ${processDate}
         basePath: ${basePath}""")
   
       val file_prefix = s"${sourceFilePrefix}/${processDate}/"
       var df_intermediate = spark.read.json(file_prefix)
       df_intermediate = df_intermediate.withColumn("answer", concat_ws("|", 
col("answer")))
       df_intermediate = df_intermediate.select(
         col("survey_dbid"),
         col("session_dbid"),
         col("question_id"),
         col("answer"),
         col("respondent_id"),
         col("timestamp"))
   
       // Repartition by hashing on surveyid instead of default record offset.
       // And land intermediate file so that in case of task failure 
re-computation does lead to
       // regeneration of the hashed surveyid partitioning again from source 
json files
       df_intermediate.repartition(col("survey_dbid"))
         .write
         .format("parquet")
         .mode("overwrite")
         .save(intermediateStagePath)
       df_intermediate.unpersist()
   
       var df_load = spark.read.parquet(intermediateStagePath)
       df_load.show()
       val key = "session_dbid,question_id"
       val combineKey = "timestamp"
       val partitionKey = "survey_dbid"
       val tablename = "session_answers"
   
       loadHudiDataset(df_load, tablename, key, combineKey, partitionKey)
     }
   
     private def loadHudiDataset(df: DataFrame, tableName: String, key: String, 
combineKey: String,
                                 partitionKey: String): Unit = {
   
       val hudiOptions = Map[String, String](
         HoodieWriteConfig.TABLE_NAME -> tableName,
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> key,
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> partitionKey,
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> combineKey,
         "hoodie.bulkinsert.shuffle.parallelism" -> "20",
         "hoodie.bulkinsert.sort.mode" -> "NONE")
   
       // Write a DataFrame as a Hudi dataset
       df.write
         .format("org.apache.hudi")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
         .options(hudiOptions)
         .mode(SaveMode.Append)
         .save(s"${basePath}/${tableName}/")
   
     }
   
   }
   ```
   
   The spark UI screen shots are as follows:
   
   
![image](https://user-images.githubusercontent.com/64492084/99890250-44895980-2c2b-11eb-8a34-d9e1cb3514f2.png)
   
   
![image](https://user-images.githubusercontent.com/64492084/99890251-4b17d100-2c2b-11eb-84d6-c54df9542335.png)
   
   
![image](https://user-images.githubusercontent.com/64492084/99890254-4eab5800-2c2b-11eb-9041-d819172a12d9.png)
   
   As you can see in the Jobs completed list, even though the job id 10 started 
at 09:13 and completed in 11 mins, the next job id 11 did not start until ~5 
hrs later, i.e. at 14:16. The same is also visible in the event timeline gap 
screenshot.
   
   Regarding your queries on slack following is my response:
   
   1) Also, by loading the dataset, I mean writing to the HUDI table.
   2) The number of output partitions for this execution whose log I have 
attached is 44938.
   3) The number of partitions already present in the hudi table is 298880. 
   4) On an average everyday 12000 new partitions will be added and around 
30000 will be updated.
   
   5) The listing of .hoodie folder is as follows:
   ```
   [hadoop@ip-10-0-5-174 ~]$ aws s3 ls 
s3://lucid-datalake/mkt_hudi_tables/public/session_answers/.hoodie/
                              PRE .aux/
   2020-11-13 17:43:37          0 .aux_$folder$
   2020-11-13 17:43:37          0 .temp_$folder$
   2020-11-13 21:01:47   36023036 20201113174334.commit
   2020-11-13 17:43:41          0 20201113174334.commit.requested
   2020-11-13 17:43:41          0 20201113174334.inflight
   2020-11-14 00:04:48   33727108 20201113213152.commit
   2020-11-13 21:31:58          0 20201113213152.commit.requested
   2020-11-13 21:32:00          0 20201113213152.inflight
   2020-11-14 03:36:16   37912184 20201114004007.commit
   2020-11-14 00:40:15          0 20201114004007.commit.requested
   2020-11-14 00:40:18          0 20201114004007.inflight
   2020-11-14 06:53:05   39230472 20201114040953.commit
   2020-11-14 04:10:04          0 20201114040953.commit.requested
   2020-11-14 04:10:07          0 20201114040953.inflight
   2020-11-14 10:07:24   39733730 20201114072855.commit
   2020-11-14 07:29:07          0 20201114072855.commit.requested
   2020-11-14 07:29:11          0 20201114072855.inflight
   2020-11-14 13:19:32   41089090 20201114104035.commit
   2020-11-14 10:40:59          0 20201114104035.commit.requested
   2020-11-14 10:41:09          0 20201114104035.inflight
   2020-11-14 16:28:45   42134374 20201114135313.commit
   2020-11-14 13:53:38          0 20201114135313.commit.requested
   2020-11-14 13:53:49          0 20201114135313.inflight
   2020-11-14 19:12:32   36778779 20201114170008.commit
   2020-11-14 17:00:36          0 20201114170008.commit.requested
   2020-11-14 17:00:47          0 20201114170008.inflight
   2020-11-14 21:40:56   35306828 20201114194157.commit
   2020-11-14 19:42:26          0 20201114194157.commit.requested
   2020-11-14 19:42:37          0 20201114194157.inflight
   2020-11-15 01:43:29   39314511 20201114221051.commit
   2020-11-14 22:11:21          0 20201114221051.commit.requested
   2020-11-14 22:11:33          0 20201114221051.inflight
   2020-11-15 04:40:24   40687717 20201115022022.commit
   2020-11-15 02:20:59          0 20201115022022.commit.requested
   2020-11-15 02:21:13          0 20201115022022.inflight
   2020-11-15 10:24:44   40196023 20201115081109.commit
   2020-11-15 08:11:51          0 20201115081109.commit.requested
   2020-11-15 08:12:07          0 20201115081109.inflight
   2020-11-15 16:12:59   40939005 20201115135948.commit
   2020-11-15 14:00:30          0 20201115135948.commit.requested
   2020-11-15 14:00:47          0 20201115135948.inflight
   2020-11-15 22:13:07   41510484 20201115200055.commit
   2020-11-15 20:01:35          0 20201115200055.commit.requested
   2020-11-15 20:01:52          0 20201115200055.inflight
   2020-11-16 04:39:40   36662698 20201116021224.commit
   2020-11-16 02:13:09          0 20201116021224.commit.requested
   2020-11-16 02:13:27          0 20201116021224.inflight
   2020-11-16 12:33:33   34875744 20201116101650.commit
   2020-11-16 10:17:38          0 20201116101650.commit.requested
   2020-11-16 10:17:58          0 20201116101650.inflight
   2020-11-17 09:05:55   34875750 20201117065854.commit
   2020-11-17 06:59:46          0 20201117065854.commit.requested
   2020-11-17 07:00:10          0 20201117065854.inflight
   2020-11-17 19:35:18   34875742 20201117171409.commit
   2020-11-17 17:15:10          0 20201117171409.commit.requested
   2020-11-17 17:15:34          0 20201117171409.inflight
   2020-11-18 02:09:10   39673955 20201117234859.commit
   2020-11-17 23:49:48          0 20201117234859.commit.requested
   2020-11-17 23:50:09          0 20201117234859.inflight
   2020-11-18 09:02:40   39423992 20201118064329.commit
   2020-11-18 06:44:25          0 20201118064329.commit.requested
   2020-11-18 06:44:49          0 20201118064329.inflight
   2020-11-18 16:19:52   38855056 20201118135232.commit
   2020-11-18 13:53:30          0 20201118135232.commit.requested
   2020-11-18 13:53:55          0 20201118135232.inflight
   2020-11-18 23:38:37   37912787 20201118212218.commit
   2020-11-18 21:23:14          0 20201118212218.commit.requested
   2020-11-18 21:23:37          0 20201118212218.inflight
   2020-11-19 12:31:03   39340580 20201119063923.commit
   2020-11-19 10:58:07          0 20201119063923.commit.requested
   2020-11-19 10:58:41          0 20201119063923.inflight
   2020-11-19 10:48:53    4685681 20201119063957.rollback
   2020-11-19 10:48:53          0 20201119063957.rollback.inflight
   2020-11-19 19:08:49   35691447 20201119173726.commit
   2020-11-19 17:38:31          0 20201119173726.commit.requested
   2020-11-19 17:39:00          0 20201119173726.inflight
   2020-11-20 01:51:04   33863679 20201120000744.commit
   2020-11-20 00:08:52          0 20201120000744.commit.requested
   2020-11-20 00:09:21          0 20201120000744.inflight
   2020-11-20 09:13:19   38344846 20201120072713.commit
   2020-11-20 07:28:24          0 20201120072713.commit.requested
   2020-11-20 07:28:55          0 20201120072713.inflight
   2020-11-20 17:30:49   38464812 20201120155037.commit
   2020-11-20 15:51:56          0 20201120155037.commit.requested
   2020-11-20 15:52:28          0 20201120155037.inflight
   2020-11-21 00:40:21   37365365 20201120230924.commit
   2020-11-20 23:10:37          0 20201120230924.commit.requested
   2020-11-20 23:11:08          0 20201120230924.inflight
   2020-11-13 17:43:36          0 archived_$folder$
   2020-11-13 17:43:38        236 hoodie.properties
   [hadoop@ip-10-0-5-174 ~]$
   
   ```
   
   6. The listing of a few partitions is as follows:
   
   survey_dbid: 3324371 partition
   ```
   [hadoop@ip-10-0-5-174 ~]$ aws s3 ls 
s3://lucid-datalake/mkt_hudi_tables/public/session_answers/3324371/
   2020-11-20 16:31:46         93 .hoodie_partition_metadata
   2020-11-20 17:01:30     436614 
8ef7f3a5-4f2b-4a23-b5dd-cdfa151aba5f-636_10-8-13577_20201120155037.parquet
   ```
   
   survey_dbid: 7885175 partition
   ```
   [hadoop@ip-10-0-5-174 ~]$ aws s3 ls 
s3://lucid-datalake/mkt_hudi_tables/public/session_answers/7885175/
   2020-11-14 08:31:46         93 .hoodie_partition_metadata
   2020-11-14 09:34:16     436204 
1c67edad-6b51-4609-bbac-f4936e30e760-4659_3-7-13437_20201114072855.parquet
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to