machadoluiz opened a new issue, #8824:
URL: https://github.com/apache/hudi/issues/8824

   **Describe the problem you faced**
   
   Our project requires that we perform full loads daily, retaining these 
versions for future queries. Upon implementing Hudi to maintain 6 years of data 
with the following setup:
   
   ```python
   "hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
   "hoodie.cleaner.hours.retained": "52560", # 24 hours * 365 days * 6 years
   ```
   
   We observed, after about 30 runs, a compromise in data integrity. During 
reading, the versions of data mix up and produce duplicate records, causing a 
series of significant issues in our DataLake (S3), since these tables are used 
by other scripts.
   
   To solve these problems, we made adjustments for the maximum and minimum 
amount of commits, applying the following configurations, as referenced in 
[#7600](https://github.com/apache/hudi/issues/7600#issuecomment-1411949976):
   
   ```python
   "hoodie.keep.max.commits": "2300", # (365 days * 6 years) + delta
   "hoodie.keep.min.commits": "2200", # (365 days * 6 years) + delta2
   ```
   
   However, this solution becomes excessively costly over time. We simulated 
running the scripts multiple times, partitioning by day, and both the 
difference and the writing cost grew significantly for a small table over a 
year of data. In 1 year, the average runtime for a script went from 00m:25s to 
02m:30s. As we need to keep 6 years of history, this processing time tends to 
scale even more.
   
   **To reproduce**
   
   Follow the instructions below to reproduce the behavior:
   
   1. Create the example dataframe:
   
   ```python
   data = [
       Row(SK=-6698625589789238999, DSC='A', COD=1), 
       Row(SK=8420071140774656230,  DSC='B', COD=2), 
       Row(SK=-8344648708406692296, DSC='C', COD=4), 
       Row(SK=504019808641096632,   DSC='D', COD=5), 
       Row(SK=-233500712460350175,  DSC='E', COD=6), 
       Row(SK=2786828215451145335,  DSC='F', COD=7), 
       Row(SK=-8285521376477742517, DSC='G', COD=8), 
       Row(SK=-2852032610340310743, DSC='H', COD=9), 
       Row(SK=-188596373586653926,  DSC='I', COD=10), 
       Row(SK=890099540967675307,   DSC='J', COD=11), 
       Row(SK=72738756111436295,    DSC='K', COD=12), 
       Row(SK=6122947679528380961,  DSC='L', COD=13), 
       Row(SK=-3715488255824917081, DSC='M', COD=14), 
       Row(SK=7553013721279796958,  DSC='N', COD=15)
   ]
   dataframe = spark.createDataFrame(data)
   ```
   
   2. With the following Hudi configuration:
   
   ```python
   hudi_options = {
       "hoodie.table.name": "example_hudi",
       "hoodie.datasource.write.recordkey.field": "SK",
       "hoodie.datasource.write.table.name": "example_hudi",
       "hoodie.datasource.write.operation": "insert_overwrite_table",
       "hoodie.datasource.write.partitionpath.field": "LOAD_DATE",
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": "example_hudi",
       "hoodie.datasource.hive_sync.partition_fields": "LOAD_DATE",
       "hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
       "hoodie.cleaner.hours.retained": "52560",
       "hoodie.keep.max.commits": "2300", 
       "hoodie.keep.min.commits":"2200",  
       "hoodie.datasource.write.precombine.field":"",
       
"hoodie.datasource.hive_sync.partition_extractor_class":"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.enable":"true",
       "hoodie.datasource.hive_sync.use_jdbc":"false",
       "hoodie.datasource.hive_sync.mode":"hms",
   }
   ```
   
   3. Now, write the date range:
   
   ```python
   date = datetime.strptime('2023-06-02', '%Y-%m-%d')  # Initial date 
(yyyy-mm-dd)
   final_date = datetime.strptime('2023-11-01', '%Y-%m-%d')  # Final date 
(yyyy-mm-dd)
   while date <= final_date:
       dataframe = dataframe.withColumn("LOAD_DATE", 
to_date(lit(date.strftime('%Y-%m-%d'))))
       dataframe.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(basePath)
       date += timedelta(days=1)
   ```
   
   4. After this, analyze the time consumed between each load to notice the 
progressive growth of time. If the increase continues at this rate, the time 
will become unmanageable, since there are tables much larger than the example 
one.
   
   **Expected behavior**
   
   We expected:
   
   * No duplicate files would emerge after the completion of the 30 commits.
   * Execution time would not increase significantly over time.
   * Metadata would follow the behavior determined by the 
```hoodie.cleaner.policy KEEP_LATEST_BY_HOURS``` attribute.
   
   **Environment description**
   
   * Hudi Version: 0.12.2
   * Spark Version: 3.3.1
   * Hive Version: 3.1.3
   * Hadoop Version: Not used
   * Storage (HDFS/S3/GCS..): S3 (EMRFS)
   * Running on Docker? (yes/no): No, running on AWS EMR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to