lijintao-by opened a new issue, #12619:
URL: https://github.com/apache/hudi/issues/12619

   We encountered the following three issues when using Hudi MOR bucketed 
tables:
   1. After synchronizing historical data using Spark's insert_bulk mode, we 
started a Flink task in upsert mode to write incremental data. We found that 
the compaction operation could only complete when each bucket had data. 
Additionally, after the compaction operation was completed, each bucket 
contained only one file.
   2. When querying the Hudi table through Hive, we found that only the data 
after compaction was readable. If the number of buckets is large, the time 
taken for compaction significantly delays the availability of data.
   3. After compaction, each bucket contains only one file, and historical 
files are cleaned up. This happens even though we have configured the following 
cleaning policies:
   options.put("hoodie.clean.automatic", "true");
   options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
   options.put("hoodie.cleaner.commits.retained", "5");
   options.put("hoodie.clean.async", "true");
   We would like to get answers to the following questions:
   1. Does the compaction operation have to wait until all buckets have files 
before it can complete?
   2. Is it expected behavior that Hive can only read data after the compaction 
operation is completed?
   3. After compaction, is it expected that each bucket contains only one file? 
Is there a way to retain more historical files?
   
   To Reproduce
   Steps to reproduce:
   1. Use Spark in insert_bulk mode to write historical data into the Hudi MOR 
bucketed table.  
   2. Start Flink in upsert mode to incrementally write new data.  
   3. After incremental data is written, trigger the compaction operation.  
   4. Use Hive to query the Hudi table and find that only data after compaction 
is readable.  
   5. Check the file storage and find that only one file is retained in each 
bucket, and historical files are cleaned up.
   Flink-related parameters:
   options.put("hoodie.write.concurrency.mode","optimistic_concurrency_control" 
);
   options.put("hoodie.upsert.shuffle.parallelism", "20");
   options.put("hoodie.insert.shuffle.parallelism", "20");
   options.put("write.operation", "upsert");
   options.put("write.tasks", "2");
   
   options.put("index.type","BUCKET");
   options.put("hoodie.bucket.index.num.buckets","10");
   options.put("hoodie.index.bucket.engine","SIMPLE");
   
   options.put("hoodie.clean.automatic", "true");
   options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
   options.put("hoodie.cleaner.commits.retained", "5");
   options.put("hoodie.clean.async", "true");
   options.put("hoodie.archive.min.commits", "20");
   options.put("hoodie.archive.max.commits", "30");
   options.put("hoodie.clean.parallelism", "20");
   options.put("hoodie.archive.parallelism", "20");
                
   options.put("hoodie.compact.inline", "false");
   options.put("hoodie.compact.inline.max.delta.commits", "1");
   options.put("hoodie.compact.schedule.inline", "true");
   
   
   Expected behavior
   1. Can the compaction operation be executed without relying on all buckets 
having files? Is there any configuration to optimize this behavior?  
   2. Is it possible for Hive to read incremental data from the Hudi table 
without waiting for the compaction to be completed?  
   3. After the compaction is completed, is it possible to retain more 
historical files instead of having only one file per bucket?  
   4. There is a table with 3 billion records and 300 buckets. The Flink job 
runs normally, but the compaction status remains "INFLIGHT".
   
   Environment Description
   ● Hudi version: 0.14.0
   ● Spark version: 3.2.1
   ● Hive version: 3.1.2
   ● Hadoop version: 3.2.2
   ● Storage: HDFS
   ● Running on Docker?: No
   
   Additional context
   1. In the scenario where historical data is written using Spark and 
incremental data is written using Flink, the following features are required:
   2. Faster visibility of incremental data to reduce data latency.  
   3. Retention of multiple historical files after compaction to enable more 
flexible historical queries and failure recovery.
   
   Stacktrace
   There are no specific error logs; the issue is a question about functional 
behavior.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to