This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b668fc167 [HUDI-5823][RFC-65] RFC for Partition Lifecycle Management 
(#8062)
60b668fc167 is described below

commit 60b668fc1670e61f76393888e38dd9f817f92c1b
Author: StreamingFlames <[email protected]>
AuthorDate: Fri Dec 8 10:51:43 2023 +0800

    [HUDI-5823][RFC-65] RFC for Partition Lifecycle Management (#8062)
---
 rfc/rfc-65/rfc-65.md | 227 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 227 insertions(+)

diff --git a/rfc/rfc-65/rfc-65.md b/rfc/rfc-65/rfc-65.md
new file mode 100644
index 00000000000..4e750dced1f
--- /dev/null
+++ b/rfc/rfc-65/rfc-65.md
@@ -0,0 +1,227 @@
+## Proposers
+
+- @stream2000
+- @hujincalrin
+- @huberylee
+- @YuweiXiao
+
+## Approvers
+
+## Status
+
+JIRA: [HUDI-5823](https://issues.apache.org/jira/browse/HUDI-5823)
+
+## Abstract
+
+In some classic hudi use cases, users partition hudi data by time and are only 
interested in data from a recent period
+of time. The outdated data is useless and costly, we need a TTL(Time-To-Live) 
management mechanism to prevent the
+dataset from growing infinitely.
+This proposal introduces Partition TTL Management strategies to hudi, people 
can config the strategies by table config
+directly or by call commands. With proper configs set, Hudi can find out which 
partitions are outdated and delete them.
+
+This proposal introduces Partition TTL Management service to hudi. TTL 
management is like other table services such as Clean/Compaction/Clustering.
+The user can config their ttl strategies through write configs and Hudi will 
help users find expired partitions and delete them automatically.
+
+## Background
+
+TTL management mechanism is an important feature for databases. Hudi already 
provides a `delete_partition` interface to
+delete outdated partitions. However, users still need to detect which 
partitions are outdated and
+call `delete_partition` manually, which means that users need to define and 
implement some kind of TTL strategies, find expired partitions and call call 
`delete_partition` by themselves. As the scale of installations grew, it is 
becoming increasingly important to implement a user-friendly TTL management 
mechanism for hudi.
+
+## Implementation
+
+Our main goals are as follows:
+
+* Providing an extensible framework for partition TTL management.
+* Implement a simple KEEP_BY_TIME strategy, which can be executed through 
independent Spark job, synchronous or asynchronous table services.
+
+### Strategy Definition
+
+The TTL strategies is similar to existing table service strategies. We can 
define TTL strategies like defining a clustering/clean/compaction strategy: 
+
+```properties
+hoodie.partition.ttl.strategy=KEEP_BY_TIME
+hoodie.partition.ttl.strategy.class=org.apache.hudi.table.action.ttl.strategy.KeepByTimePartitionTTLStrategy
+hoodie.partition.ttl.days.retain=10
+```
+
+The config `hoodie.partition.ttl.strategy.class` is to provide a strategy 
class (subclass of `PartitionTTLStrategy`) to get expired partition paths to 
delete. And `hoodie.partition.ttl.days.retain` is the strategy value used by 
`KeepByTimePartitionTTLStrategy` which means that we will expire partitions 
that haven't been modified for this strategy value set. We will cover the 
`KeepByTimeTTLStrategy` strategy in detail in the next section.
+
+The core definition of `PartitionTTLStrategy` looks like this: 
+
+```java
+/**
+ * Strategy for partition-level TTL management.
+ */
+public abstract class PartitionTTLStrategy {
+
+  protected final HoodieTable hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+
+  public PartitionTTLStrategy(HoodieTable hoodieTable) {
+    this.writeConfig = hoodieTable.getConfig();
+    this.hoodieTable = hoodieTable;
+  }
+  
+  /**
+   * Get expired partition paths for a specific partition ttl management 
strategy.
+   *
+   * @return Expired partition paths.
+   */
+  public abstract List<String> getExpiredPartitionPaths();
+}
+```
+
+Users can provide their own implementation of `PartitionTTLStrategy` and hudi 
will help delete the expired partitions.
+
+### KeepByTimeTTLManagementStrategy
+
+We will provide a strategy call `KeepByTimePartitionTTLStrategy` in the first 
version of partition TTL management implementation.
+
+The `KeepByTimePartitionTTLStrategy` will calculate the `lastCommitTime` for 
each input partitions. If duration between now and 'lastCommitTime' for the 
partition is larger than what `hoodie.partition.ttl.days.retain` configured, 
`KeepByTimePartitionTTLStrategy` will mark this partition as an expired 
partition. We use day as the unit of expired time since it is very common-used 
for datalakes. Open to ideas for this. 
+
+we will to use the largest commit time of committed file groups in the 
partition as the partition's
+`lastCommitTime`. So any write (including normal DMLs, clustering etc.) with 
larger instant time will change the partition's `lastCommitTime`.
+
+For file groups generated by replace commit, it may not reveal the real 
insert/update time for the file group. However, we can assume that we won't do 
clustering for a partition without new writes for a long time when using the 
strategy. And in the future, we may introduce a more accurate mechanism to get 
`lastCommitTime` of a partition, for example using metadata table. 
+
+For 1.0.0 and later hudi version which supports efficient completion time 
queries on the timeline(#9565), we can get partition's `lastCommitTime` by 
scanning the timeline and get the last write commit for the partition. Also for 
efficiency, we can store the partitions' last modified time and current 
completion time in the replace commit metadata. The next time we need to 
calculate the partitions' last modified time, we can build incrementally from 
the replace commit metadata of the last  [...]
+
+### Apply different strategies for different partitions
+
+For some specific users, they may want to apply different strategies for 
different partitions. For example, they may have multi partition 
fileds(productId, day). For partitions under `product=1` they want to keep for 
30 days while for partitions under `product=2` they want to keep for 7 days 
only. 
+
+For the first version of TTL management, we do not plan to implement a 
complicated strategy (For example, use an array to store strategies, introduce 
partition regex etc.). Instead, we add a new abstract method 
`getPartitionPathsForTTL` in `PartitionTTLStrategy` and provides a new config 
`hoodie.partition.ttl.partition.selected`. 
+
+If `hoodie.partition.ttl.partition.selected` is set, `getPartitionPathsForTTL` 
will return partitions provided by this config. If not, 
`getPartitionPathsForTTL` will return all partitions in the hudi table. 
+
+TTL strategies will only be applied for partitions return by 
`getPartitionPathsForTTL`. 
+
+Thus, if users want to apply different strategies for different partitions, 
they can do the TTL management multiple times, selecting different partitions 
and apply corresponding strategies. And we may provide a batch interface in the 
future to simplify this. 
+
+The `getPartitionPathsForTTL` method will look like this:
+
+```java
+/**
+ * Strategy for partition-level TTL management.
+ */
+public abstract class PartitionTTLStrategy {
+  protected final HoodieTable hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+
+  public PartitionTTLStrategy(HoodieTable hoodieTable) {
+    this.writeConfig = hoodieTable.getConfig();
+    this.hoodieTable = hoodieTable;
+  }
+  
+   /**
+    * Scan and list all partitions for partition TTL management.
+    *
+    * @return Partitions to apply TTL strategy
+    */
+   protected List<String> getPartitionPathsForTTL() {
+      if (StringUtils.isNullOrEmpty(writeConfig.getTTLPartitionSelected())) {
+        return getMatchedPartitions();
+      } else {
+        // Return All partition paths
+        return FSUtils.getAllPartitionPaths(hoodieTable.getContext(), 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+      }
+   }
+}
+``` 
+
+### Executing TTL
+
+Once we already have a proper `PartitionTTLStrategy` implementation, it's easy 
to execute the ttl management. 
+
+```java
+public class SparkTTLManagementActionExecutor <T> extends 
BaseSparkCommitActionExecutor<T> {
+   @Override
+   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+      // Construct PartitionTTLstrategy
+      PartitionTTLManagementStrategy strategy = (PartitionTTLStrategy) 
ReflectionUtils.loadClass(
+              PartitionTTLStrategy.checkAndGetPartitionTTLStrategy(config),
+              new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config);
+
+      // Get expired partition paths
+      List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+      
+      // Delete them reusing SparkDeletePartitionCommitActionExecutor
+      return new SparkDeletePartitionCommitActionExecutor<>(context, config, 
table, instantTime, expiredPartitions).execute();
+   }
+}
+```
+
+We will add a new method `managePartitionTTL` in `HoodieTable` and 
`HoodieSparkCopyOnWriteTable` can implement it like this:
+
+```java
+@Override
+public HoodieWriteMetadata<HoodieData<WriteStatus>> 
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+  return new SparkTTActionExecutor<>(context, config, this, 
instantTime).execute();
+}
+```
+
+We can call `hoodieTable.managePartitionTTL` in independent flink/spark job, 
in async/sync inline table services like clustering/compaction/clean etc.
+
+
+### User interface for Partition TTL Management
+
+We can do partition TTL management inline with streaming ingestion job or do 
it with a independent batch job, for both spark and flink engine.
+
+#### Run inline with Streaming Ingestion 
+
+Since we can run clustering inline with streaming ingestion job through the 
following config: 
+
+```properties
+hoodie.clustering.async.enabled=true
+hoodie.clustering.async.max.commits=5
+```
+
+We can do similar thing for partition TTL management. The config for async ttl 
management are: 
+
+| Config key                   | Remarks                                       
                                                                     | Default |
+|------------------------------|--------------------------------------------------------------------------------------------------------------------|---------|
+| hoodie.ttl.async.enabled     | Enable running of TTL management service, 
asynchronously as writes happen on the table.                            | 
False   |
+| hoodie.ttl.async.max.commits | Control frequency of async TTL management by 
specifying after how many commits TTL management should be triggered. | 4       
|
+
+We can easily implement async ttl management for both spark and flink engine 
since we only need to call `hoodieTable.managePartitionTTL`. And we can support 
synchronized ttl management if we want.
+
+#### Run by Independent Job
+
+Deleting a large number of partitions is a heavy operation so we may want to 
run TTL management through a independent job. We will provide a SparkSQL Call 
Command to run TTL management and it may look like this: 
+
+```sql
+call managePartitionTTL(table => 'hudi_table', strategy => 'KEEP_BY_TIME', 
daysRetain => '10', predicate => 'productid = 1');
+```
+
+The params are as follows:
+
+| Param name | Remarks                                                         
                                       | Default      |
+|------------|--------------------------------------------------------------------------------------------------------|--------------|
+| table      | The hoodie table to run partition TTL                           
                                       | empty string |
+| basePath   | The hoodie table path to run partition TTL                      
                                       | empty string |
+| strategy   | The partition TTL strategy, corresponding to a implementation 
of `PartitionTTLStrategy`                | KEEP_BY_TIME |
+| predicate  | Partition predicate for TTL, will only apply ttl strategy on 
the partitions selected by this predicate | empty string |
+
+
+Besides SparkSQL call commands, we can support run TTL management with a spark 
jar like running clustering by `HoodieClusteringJob` and run TTL with a flink 
job like `HoodieFlinkClusteringJob` in the future.
+
+### Future plan
+
+We can do a lot of things about TTL management in the future:
+
+* Support record level TTL management
+* Move the partitions to be cleaned up to cold/cheaper storage in objects 
stores instead of delete them forever
+* Stash the partitions to be cleaned up in .stashedForDeletion folder (at 
.hoodie level) and introduce some recover mechanism, for data security.
+* Support advanced ttl policies, for example wild card partition spec.`
+* ...
+
+## Rollout/Adoption Plan
+
+Hoodie Partition TTL Management V1 will support a simple KEEP_BY_TIME strategy 
at first, and others can implement their own `PartitionTTLStrategy`. 
+
+Add this feature won't affect existing functions.
+
+## Test Plan
+
+Will add UTs and ITs to test this.

Reply via email to