This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d267a4bf4 [HUDI-5823] Partition ttl management (#9723)
89d267a4bf4 is described below

commit 89d267a4bf49692297a1c97b97ea71bfa2f4ee74
Author: stream2000 <[email protected]>
AuthorDate: Wed Feb 28 09:02:32 2024 +0800

    [HUDI-5823] Partition ttl management (#9723)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  17 ++
 .../org/apache/hudi/config/HoodieTTLConfig.java    | 130 ++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  33 ++++
 .../java/org/apache/hudi/table/HoodieTable.java    |   8 +
 .../HoodiePartitionTTLStrategyFactory.java         |  89 ++++++++++
 .../ttl/strategy/KeepByCreationTimeStrategy.java   |  54 ++++++
 .../action/ttl/strategy/KeepByTimeStrategy.java    | 110 ++++++++++++
 .../action/ttl/strategy/PartitionTTLStrategy.java  |  77 +++++++++
 .../ttl/strategy/PartitionTTLStrategyType.java     |  75 ++++++++
 .../table/action/ttl/strategy/TTLStrategy.java     |  25 +++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   5 +
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   5 +
 .../apache/hudi/client/SparkRDDWriteClient.java    |   8 +
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  12 ++
 .../commit/SparkPartitionTTLActionExecutor.java    |  59 +++++++
 .../hudi/client/TestPartitionTTLManagement.java    | 188 +++++++++++++++++++++
 .../table/timeline/HoodieInstantTimeGenerator.java |   2 +-
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../hudi/command/procedures/RunTTLProcedure.scala  |  99 +++++++++++
 .../sql/hudi/procedure/TestTTLProcedure.scala      | 117 +++++++++++++
 .../org/apache/hudi/utilities/HoodieTTLJob.java    | 131 ++++++++++++++
 21 files changed, 1244 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 967aaa4f68e..8e89597775d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -484,6 +484,17 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     return false;
   }
 
+  /**
+   * Delete expired partition by config.
+   *
+   * @param instantTime Instant Time for the action
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<T> managePartitionTTL(String instantTime) {
+    HoodieTable<?, I, ?, T> table = createTable(config, 
context.getHadoopConf().get());
+    return table.managePartitionTTL(context, instantTime);
+  }
+
   protected abstract void validateClusteringCommit(HoodieWriteMetadata<O> 
clusteringMetadata, String clusteringCommitTime, HoodieTable table);
 
   protected abstract HoodieWriteMetadata<O> 
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
@@ -583,6 +594,12 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
       
metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), 
"true");
       inlineScheduleClustering(extraMetadata);
     }
+
+    //  Do an inline partition ttl management if enabled
+    if (config.isInlinePartitionTTLEnable()) {
+      String instantTime = createNewInstantTime();
+      table.managePartitionTTL(table.getContext(), instantTime);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
new file mode 100644
index 00000000000..1f9a4e40e98
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategyType;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Hoodie Configs for partition/record level ttl management.
+ */
+@Immutable
+@ConfigClassProperty(name = "TTL management Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Data ttl management")
+public class HoodieTTLConfig extends HoodieConfig {
+
+  public static final String PARTITION_TTL_STRATEGY_PARAM_PREFIX = 
"hoodie.partition.ttl.strategy.";
+
+  public static final String KEEP_BY_TIME_PARTITION_TTL_STRATEGY =
+      "org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy";
+  public static final ConfigProperty<Boolean> INLINE_PARTITION_TTL = 
ConfigProperty
+      .key("hoodie.partition.ttl.inline")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, the partition ttl management service 
is invoked immediately after each commit, "
+          + "to delete exipired partitions");
+
+  public static final ConfigProperty<String> PARTITION_TTL_STRATEGY_CLASS_NAME 
= ConfigProperty
+      .key("hoodie.partition.ttl.strategy.class")
+      .noDefaultValue()
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("Config to provide a strategy class (subclass of 
PartitionTTLStrategy) to get the expired partitions");
+
+  public static final ConfigProperty<String> PARTITION_TTL_STRATEGY_TYPE = 
ConfigProperty
+      .key("hoodie.partition.ttl.management.strategy.type")
+      .defaultValue(PartitionTTLStrategyType.KEEP_BY_TIME.name())
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("Partition ttl management strategy type to determine 
the strategy class");
+
+  public static final ConfigProperty<Integer> DAYS_RETAIN = ConfigProperty
+      .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "days.retain")
+      .defaultValue(-1)
+      .sinceVersion("1.0.0")
+      .markAdvanced()
+      .withDocumentation("Partition ttl management KEEP_BY_TIME strategy days 
retain");
+
+  public static final ConfigProperty<String> PARTITION_SELECTED = 
ConfigProperty
+      .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "partition.selected")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Partitions to manage ttl");
+
+  public static final ConfigProperty<Integer> MAX_PARTITION_TO_DELETE = 
ConfigProperty
+      .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "max.delete.partitions")
+      .defaultValue(1000)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("max partitions to delete in partition ttl 
management");
+
+  public static class Builder {
+    private final HoodieTTLConfig ttlConfig = new HoodieTTLConfig();
+
+    public HoodieTTLConfig.Builder withTTLPartitionSelected(String 
partitionSelected) {
+      ttlConfig.setValue(PARTITION_SELECTED, partitionSelected);
+      return this;
+    }
+
+    public HoodieTTLConfig.Builder withTTLDaysRetain(Integer daysRetain) {
+      ttlConfig.setValue(DAYS_RETAIN, daysRetain.toString());
+      return this;
+    }
+
+    public HoodieTTLConfig.Builder enableInlinePartitionTTL(Boolean enable) {
+      ttlConfig.setValue(INLINE_PARTITION_TTL, enable.toString());
+      return this;
+    }
+
+    public HoodieTTLConfig.Builder withTTLStrategyClass(String clazz) {
+      ttlConfig.setValue(PARTITION_TTL_STRATEGY_CLASS_NAME, clazz);
+      return this;
+    }
+
+    public HoodieTTLConfig.Builder 
withTTLStrategyType(PartitionTTLStrategyType ttlStrategyType) {
+      ttlConfig.setValue(PARTITION_TTL_STRATEGY_TYPE, ttlStrategyType.name());
+      return this;
+    }
+
+    public HoodieTTLConfig.Builder fromProperties(Properties props) {
+      this.ttlConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public HoodieTTLConfig build() {
+      ttlConfig.setDefaults(HoodieTTLConfig.class.getName());
+      return ttlConfig;
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8fd3546671e..f4cb386d271 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2697,6 +2697,29 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getWriteConcurrencyMode().isNonBlockingConcurrencyControl();
   }
 
+  /**
+   * TTL configs.
+   */
+  public boolean isInlinePartitionTTLEnable() {
+    return getBoolean(HoodieTTLConfig.INLINE_PARTITION_TTL);
+  }
+
+  public String getPartitionTTLStrategyClassName() {
+    return getString(HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME);
+  }
+
+  public Integer getPartitionTTLStrategyDaysRetain() {
+    return getInt(HoodieTTLConfig.DAYS_RETAIN);
+  }
+
+  public String getPartitionTTLPartitionSelected() {
+    return getString(HoodieTTLConfig.PARTITION_SELECTED);
+  }
+
+  public Integer getPartitionTTLMaxPartitionsToDelete() {
+    return getInt(HoodieTTLConfig.MAX_PARTITION_TO_DELETE);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2716,6 +2739,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     private boolean isCallbackConfigSet = false;
     private boolean isPayloadConfigSet = false;
     private boolean isMetadataConfigSet = false;
+
+    private boolean isTTLConfigSet = false;
     private boolean isLockConfigSet = false;
     private boolean isPreCommitValidationConfigSet = false;
     private boolean isMetricsJmxConfigSet = false;
@@ -2995,6 +3020,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withTTLConfig(HoodieTTLConfig ttlConfig) {
+      writeConfig.getProps().putAll(ttlConfig.getProps());
+      isTTLConfigSet = true;
+      return this;
+    }
+
     public Builder withAutoCommit(boolean autoCommit) {
       writeConfig.setValue(AUTO_COMMIT_ENABLE, String.valueOf(autoCommit));
       return this;
@@ -3262,6 +3293,8 @@ public class HoodieWriteConfig extends HoodieConfig {
       final boolean isLockProviderPropertySet = 
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
       writeConfig.setDefaultOnCondition(!isLockConfigSet,
           
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+      writeConfig.setDefaultOnCondition(!isTTLConfigSet,
+          
HoodieTTLConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
 
       autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 080fe5f357d..3b78fb09090 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -291,6 +291,14 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    */
   public abstract HoodieWriteMetadata<O> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, I 
records);
 
+  /**
+   * Delete expired partition by config
+   * @param context HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata<O> 
managePartitionTTL(HoodieEngineContext context, String instantTime);
+
   public HoodieWriteConfig getConfig() {
     return config;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
new file mode 100644
index 00000000000..26bcaa9fe51
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import static 
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static 
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Factory help to create {@link PartitionTTLStrategy}.
+ * <p>
+ * This factory will try {@link 
HoodieTTLConfig#PARTITION_TTL_STRATEGY_CLASS_NAME} firstly,
+ * this ensures the class prop will not be overwritten by {@link 
PartitionTTLStrategyType}
+ */
+public class HoodiePartitionTTLStrategyFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodiePartitionTTLStrategyFactory.class);
+
+  public static PartitionTTLStrategy createStrategy(HoodieTable hoodieTable, 
TypedProperties props, String instantTime) throws IOException {
+    String strategyClassName = getPartitionTTLStrategyClassName(props);
+    try {
+      return (PartitionTTLStrategy) 
ReflectionUtils.loadClass(strategyClassName,
+          new Class<?>[] {HoodieTable.class, String.class}, hoodieTable, 
instantTime);
+    } catch (Throwable e) {
+      throw new IOException("Could not load partition ttl management strategy 
class " + strategyClassName, e);
+    }
+  }
+
+  private static String getPartitionTTLStrategyClassName(TypedProperties 
props) {
+    String strategyClassName =
+        props.getString(PARTITION_TTL_STRATEGY_CLASS_NAME.key(), null);
+    if (StringUtils.isNullOrEmpty(strategyClassName)) {
+      String strategyType = props.getString(PARTITION_TTL_STRATEGY_TYPE.key(),
+          PARTITION_TTL_STRATEGY_TYPE.defaultValue());
+      PartitionTTLStrategyType strategyTypeEnum;
+      try {
+        strategyTypeEnum = 
PartitionTTLStrategyType.valueOf(strategyType.toUpperCase(Locale.ROOT));
+      } catch (IllegalArgumentException e) {
+        throw new HoodieException("Unsupported PartitionTTLStrategy Type " + 
strategyType);
+      }
+      strategyClassName = getPartitionTTLStrategyFromType(strategyTypeEnum);
+    }
+    return strategyClassName;
+  }
+
+  /**
+   * @param type {@link PartitionTTLStrategyType} enum.
+   * @return The partition ttl management strategy class name based on the 
{@link PartitionTTLStrategyType}.
+   */
+  public static String 
getPartitionTTLStrategyFromType(PartitionTTLStrategyType type) {
+    switch (type) {
+      case KEEP_BY_TIME:
+        return KeepByTimeStrategy.class.getName();
+      case KEEP_BY_CREATION_TIME:
+        return KeepByCreationTimeStrategy.class.getName();
+      default:
+        throw new HoodieException("Unsupported PartitionTTLStrategy Type " + 
type);
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
new file mode 100644
index 00000000000..a350086f2dc
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByCreationTimeStrategy extends KeepByTimeStrategy {
+
+  public KeepByCreationTimeStrategy(HoodieTable hoodieTable, String 
instantTime) {
+    super(hoodieTable, instantTime);
+  }
+
+  @Override
+  protected List<String> getExpiredPartitionsForTimeStrategy(List<String> 
partitionPathsForTTL) {
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    return partitionPathsForTTL.stream().parallel().filter(part -> {
+      HoodiePartitionMetadata hoodiePartitionMetadata =
+          new HoodiePartitionMetadata(metaClient.getFs(), 
FSUtils.getPartitionPath(metaClient.getBasePath(), part));
+      Option<String> instantOption = 
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+      if (instantOption.isPresent()) {
+        String instantTime = instantOption.get();
+        return isPartitionExpired(instantTime);
+      }
+      return false;
+    }).collect(Collectors.toList());
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
new file mode 100644
index 00000000000..b6d67bb9e8a
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.fixInstantTimeCompatibility;
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimePlusMillis;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByTimeStrategy extends PartitionTTLStrategy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeepByTimeStrategy.class);
+
+  protected final long ttlInMilis;
+
+  public KeepByTimeStrategy(HoodieTable hoodieTable, String instantTime) {
+    super(hoodieTable, instantTime);
+    this.ttlInMilis = writeConfig.getPartitionTTLStrategyDaysRetain() * 1000 * 
3600 * 24;
+  }
+
+  @Override
+  public List<String> getExpiredPartitionPaths() {
+    Option<HoodieInstant> lastCompletedInstant = 
hoodieTable.getActiveTimeline().filterCompletedInstants().lastInstant();
+    if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0
+        || 
!hoodieTable.getMetaClient().getTableConfig().getPartitionFields().isPresent()) 
{
+      return Collections.emptyList();
+    }
+    List<String> expiredPartitions = 
getExpiredPartitionsForTimeStrategy(getPartitionPathsForTTL());
+    int limit = writeConfig.getPartitionTTLMaxPartitionsToDelete();
+    LOG.info("Total expired partitions count {}, limit {}", 
expiredPartitions.size(), limit);
+    return expiredPartitions.stream()
+        .limit(limit) // Avoid a single replace commit too large
+        .collect(Collectors.toList());
+  }
+
+  protected List<String> getExpiredPartitionsForTimeStrategy(List<String> 
partitionsForTTLManagement) {
+    HoodieTimer timer = HoodieTimer.start();
+    Map<String, Option<String>> lastCommitTimeForPartitions = 
getLastCommitTimeForPartitions(partitionsForTTLManagement);
+    LOG.info("Collect last commit time for partitions cost {} ms", 
timer.endTimer());
+    return lastCommitTimeForPartitions.entrySet()
+        .stream()
+        .filter(entry -> entry.getValue().isPresent())
+        .filter(entry -> isPartitionExpired(entry.getValue().get()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * @param partitionPaths Partitions to collect stats.
+   */
+  private Map<String, Option<String>> 
getLastCommitTimeForPartitions(List<String> partitionPaths) {
+    int statsParallelism = Math.min(partitionPaths.size(), 200);
+    return hoodieTable.getContext().map(partitionPaths, partitionPath -> {
+      Option<String> partitionLastModifiedTime = hoodieTable.getHoodieView()
+          .getLatestFileSlicesBeforeOrOn(partitionPath, instantTime, true)
+          .map(FileSlice::getBaseInstantTime)
+          .max(Comparator.naturalOrder())
+          .map(Option::ofNullable)
+          .orElse(Option.empty());
+      return Pair.of(partitionPath, partitionLastModifiedTime);
+    }, statsParallelism).stream().collect(Collectors.toMap(Pair::getKey, 
Pair::getValue));
+  }
+
+  /**
+   * Determines if a partition's reference time has exceeded its time-to-live 
(TTL).
+   * <p>
+   * This method checks if the current time has passed the TTL threshold based 
on a
+   * reference time, which could be the creation time or the last commit time 
of the partition.
+   *
+   * @param referenceTime last commit time or creation time for partition
+   */
+  protected boolean isPartitionExpired(String referenceTime) {
+    String expiredTime = 
instantTimePlusMillis(fixInstantTimeCompatibility(referenceTime), ttlInMilis);
+    return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0;
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
new file mode 100644
index 00000000000..47768870930
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Strategy for partition-level ttl management.
+ */
+public abstract class PartitionTTLStrategy implements TTLStrategy, 
Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionTTLStrategy.class);
+
+  protected final HoodieTable hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+  protected final String instantTime;
+
+  public PartitionTTLStrategy(HoodieTable hoodieTable, String instantTime) {
+    this.writeConfig = hoodieTable.getConfig();
+    this.hoodieTable = hoodieTable;
+    this.instantTime = instantTime;
+  }
+
+  /**
+   * Get expired partition paths for a specific partition ttl strategy.
+   *
+   * @return Expired partition paths.
+   */
+  public abstract List<String> getExpiredPartitionPaths();
+
+  /**
+   * Scan and list all partitions for partition ttl management.
+   *
+   * @return all partitions paths for the dataset.
+   */
+  protected List<String> getPartitionPathsForTTL() {
+    String partitionSelected = writeConfig.getClusteringPartitionSelected();
+    HoodieTimer timer = HoodieTimer.start();
+    List<String> partitionsForTTL;
+    if (StringUtils.isNullOrEmpty(partitionSelected)) {
+      // Return All partition paths
+      partitionsForTTL = 
FSUtils.getAllPartitionPaths(hoodieTable.getContext(), 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+    } else {
+      partitionsForTTL = Arrays.asList(partitionSelected.split(","));
+    }
+    LOG.info("Get partitions for ttl cost {} ms", timer.endTimer());
+    return partitionsForTTL;
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
new file mode 100644
index 00000000000..6dfbcd6d0e5
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static 
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Types of {@link PartitionTTLStrategy}.
+ */
+public enum PartitionTTLStrategyType {
+  KEEP_BY_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy"),
+  
KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy");
+
+  private final String className;
+
+  PartitionTTLStrategyType(String className) {
+    this.className = className;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public static PartitionTTLStrategyType fromClassName(String className) {
+    for (PartitionTTLStrategyType type : PartitionTTLStrategyType.values()) {
+      if (type.getClassName().equals(className)) {
+        return type;
+      }
+    }
+    throw new IllegalArgumentException("No PartitionTTLStrategyType found for 
class name: " + className);
+  }
+
+  public static List<String> getPartitionTTLStrategyNames() {
+    List<String> names = new 
ArrayList<>(PartitionTTLStrategyType.values().length);
+    Arrays.stream(PartitionTTLStrategyType.values())
+        .forEach(x -> names.add(x.name()));
+    return names;
+  }
+
+  @Nullable
+  public static String getPartitionTTLStrategyClassName(HoodieConfig config) {
+    if (config.contains(PARTITION_TTL_STRATEGY_CLASS_NAME)) {
+      return config.getString(PARTITION_TTL_STRATEGY_CLASS_NAME);
+    } else if (config.contains(PARTITION_TTL_STRATEGY_TYPE)) {
+      return 
KeyGeneratorType.valueOf(config.getString(PARTITION_TTL_STRATEGY_TYPE)).getClassName();
+    }
+    return null;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
new file mode 100644
index 00000000000..ad41f95fba2
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+/**
+ * Strategy for ttl management.
+ */
+public interface TTLStrategy {
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 0f73b0bce05..9ca7e158a22 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -397,6 +397,11 @@ public class HoodieFlinkCopyOnWriteTable<T>
     throw new HoodieNotSupportedException("Restore is not supported yet");
   }
 
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+    throw new HoodieNotSupportedException("Manage partition ttl is not 
supported yet");
+  }
+
   @Override
   public HoodieRestoreMetadata restore(HoodieEngineContext context, String 
restoreInstantTimestamp, String savepointToRestoreTimestamp) {
     throw new HoodieNotSupportedException("Savepoint and restore is not 
supported yet");
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 4c080f2f663..13d12e46d7f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -178,6 +178,11 @@ public class HoodieJavaCopyOnWriteTable<T>
         context, config, this, instantTime, records).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+    throw new HoodieNotSupportedException("Manage partition ttl is not 
supported yet");
+  }
+
   @Override
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context,
                                                          String instantTime,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 6fdfee16bbe..0308649dbf6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -265,6 +265,14 @@ public class SparkRDDWriteClient<T> extends
     return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
   }
 
+  public HoodieWriteResult managePartitionTTL(String instantTime) {
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, 
Option.ofNullable(instantTime));
+    preWrite(instantTime, WriteOperationType.DELETE_PARTITION, 
table.getMetaClient());
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.managePartitionTTL(context, instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+    return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
+  }
+
   @Override
   protected void initMetadataTable(Option<String> instantTime) {
     // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index e9d21350c21..04c12bbf850 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -71,6 +71,7 @@ import 
org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.SparkPartitionTTLActionExecutor;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.index.RunIndexActionExecutor;
@@ -226,6 +227,16 @@ public class HoodieSparkCopyOnWriteTable<T>
         shouldRollbackUsingMarkers, isRestore).execute();
   }
 
+  /**
+   * Delete expired partition by config
+   * @param context HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> 
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+    return new SparkPartitionTTLActionExecutor<>(context, config, this, 
instantTime).execute();
+  }
+
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(
       String instantTime, String partitionPath, String fileId,
@@ -322,4 +333,5 @@ public class HoodieSparkCopyOnWriteTable<T>
   public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext 
context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
     return new RestorePlanActionExecutor<>(context, config, this, 
restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
   }
+
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
new file mode 100644
index 00000000000..166fc3672fa
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SparkPartitionTTLActionExecutor<T>
+    extends BaseSparkCommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConsistentBucketBulkInsertDataInternalWriterHelper.class);
+
+  public SparkPartitionTTLActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable table,
+                                         String instantTime) {
+    super(context, config, table, instantTime, 
WriteOperationType.DELETE_PARTITION);
+  }
+
+  @Override
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    try {
+      PartitionTTLStrategy strategy = 
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(), 
instantTime);
+      List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+      LOG.info("Partition ttl find the following expired partitions to delete: 
 " + String.join(",", expiredPartitions));
+      return new SparkDeletePartitionCommitActionExecutor<>(context, config, 
table, instantTime, expiredPartitions).execute();
+    } catch (IOException e) {
+      throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
new file mode 100644
index 00000000000..46677c9aaa7
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategyType;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+
+import com.github.davidmoten.guavamini.Sets;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+
+/**
+ * Test Cases for partition ttl management.
+ */
+public class TestPartitionTTLManagement extends HoodieClientTestBase {
+
+  protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withAutoCommit(autoCommit)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024 * 1024)
+            
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 
1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
+        .forTable("test-trip-table")
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  @Test
+  public void testKeepByCreationTime() {
+    final HoodieWriteConfig cfg = getConfigBuilder(true)
+        .withPath(metaClient.getBasePathV2().toString())
+        .withTTLConfig(HoodieTTLConfig
+            .newBuilder()
+            .withTTLDaysRetain(10)
+            
.withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_CREATION_TIME)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+        .build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String partitionPath0 = dataGen.getPartitionPaths()[0];
+      String instant0 = getCommitTimeAtUTC(0);
+      writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+      String instant1 = getCommitTimeAtUTC(1000);
+      String partitionPath1 = dataGen.getPartitionPaths()[1];
+      writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+      String currentInstant = client.createNewInstantTime();
+      String partitionPath2 = dataGen.getPartitionPaths()[2];
+      writeRecordsForPartition(client, dataGen, partitionPath2, 
currentInstant);
+
+      HoodieWriteResult result = 
client.managePartitionTTL(client.createNewInstantTime());
+
+      Assertions.assertEquals(Sets.newHashSet(partitionPath0, partitionPath1), 
result.getPartitionToReplaceFileIds().keySet());
+      Assertions.assertEquals(10, readRecords(new String[] {partitionPath0, 
partitionPath1, partitionPath2}).size());
+    }
+  }
+
+  @Test
+  public void testKeepByTime() {
+    final HoodieWriteConfig cfg = getConfigBuilder(true)
+        .withPath(metaClient.getBasePathV2().toString())
+        .withTTLConfig(HoodieTTLConfig
+            .newBuilder()
+            .withTTLDaysRetain(10)
+            .withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_TIME)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+        .build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String partitionPath0 = dataGen.getPartitionPaths()[0];
+      String instant0 = getCommitTimeAtUTC(0);
+      writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+      String instant1 = getCommitTimeAtUTC(1000);
+      String partitionPath1 = dataGen.getPartitionPaths()[1];
+      writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+      String currentInstant = client.createNewInstantTime();
+      String partitionPath2 = dataGen.getPartitionPaths()[2];
+      writeRecordsForPartition(client, dataGen, partitionPath2, 
currentInstant);
+
+      HoodieWriteResult result = 
client.managePartitionTTL(client.createNewInstantTime());
+
+      Assertions.assertEquals(Sets.newHashSet(partitionPath0, partitionPath1), 
result.getPartitionToReplaceFileIds().keySet());
+
+      // remain 10 rows
+      Assertions.assertEquals(10, readRecords(new String[] {partitionPath0, 
partitionPath1, partitionPath2}).size());
+    }
+  }
+
+  @Test
+  public void testInlinePartitionTTL() {
+    final HoodieWriteConfig cfg = getConfigBuilder(true)
+        .withPath(metaClient.getBasePathV2().toString())
+        .withTTLConfig(HoodieTTLConfig
+            .newBuilder()
+            .withTTLDaysRetain(10)
+            .withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_TIME)
+            .enableInlinePartitionTTL(true)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+        .build();
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String partitionPath0 = dataGen.getPartitionPaths()[0];
+      String instant0 = getCommitTimeAtUTC(0);
+      writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+      // All records will be deleted
+      Assertions.assertEquals(0, readRecords(new String[] 
{partitionPath0}).size());
+
+      String instant1 = getCommitTimeAtUTC(1000);
+      String partitionPath1 = dataGen.getPartitionPaths()[1];
+      writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+      // All records will be deleted
+      Assertions.assertEquals(0, readRecords(new String[] 
{partitionPath1}).size());
+
+      String currentInstant = client.createNewInstantTime();
+      String partitionPath2 = dataGen.getPartitionPaths()[2];
+      writeRecordsForPartition(client, dataGen, partitionPath2, 
currentInstant);
+
+      // remain 10 rows
+      Assertions.assertEquals(10, readRecords(new String[] 
{partitionPath2}).size());
+    }
+  }
+
+  private void writeRecordsForPartition(SparkRDDWriteClient client, 
HoodieTestDataGenerator dataGen, String partition, String instantTime) {
+    List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(instantTime, 10, partition);
+    client.startCommitWithTime(instantTime);
+    JavaRDD writeStatuses = client.insert(jsc.parallelize(records, 1), 
instantTime);
+    client.commit(instantTime, writeStatuses);
+  }
+
+  private List<GenericRecord> readRecords(String[] partitions) {
+    return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
+        Arrays.stream(partitions).map(p -> Paths.get(basePath, 
p).toString()).collect(Collectors.toList()),
+        basePath, new JobConf(hadoopConf), true, false);
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index a4f4b2cdf24..efa9c6f120a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -120,7 +120,7 @@ public class HoodieInstantTimeGenerator {
     }
   }
 
-  private static String fixInstantTimeCompatibility(String instantTime) {
+  public static String fixInstantTimeCompatibility(String instantTime) {
     // Enables backwards compatibility with non-millisecond granularity 
instants
     if (isSecondGranularity(instantTime)) {
       // Add milliseconds to the instant in order to parse successfully
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 9bb6fb6db8d..e12aad789d7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -91,6 +91,7 @@ object HoodieProcedures {
       ,(ShowTablePropertiesProcedure.NAME, 
ShowTablePropertiesProcedure.builder)
       ,(HelpProcedure.NAME, HelpProcedure.builder)
       ,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
+      ,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
new file mode 100644
index 00000000000..2d3e704ad12
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.config.HoodieTTLConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
+class RunTTLProcedure extends BaseProcedure with ProcedureBuilder with Logging 
{
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "ttl_policy", DataTypes.StringType),
+    ProcedureParameter.optional(2, "retain_days", DataTypes.IntegerType),
+    ProcedureParameter.optional(3, "options", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("deleted_partitions", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def build: Procedure = new RunTTLProcedure
+
+  /**
+   * Returns the input parameters of this procedure.
+   */
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the type of rows produced by this procedure.
+   */
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    var confs: Map[String, String] = Map.empty
+    if (getArgValueOrDefault(args, PARAMETERS(1)).isDefined) {
+      confs += HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE.key() -> 
getArgValueOrDefault(args, PARAMETERS(1)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(2)).isDefined) {
+      confs += HoodieTTLConfig.DAYS_RETAIN.key() -> getArgValueOrDefault(args, 
PARAMETERS(2)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) {
+      confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[String])
+    }
+
+    val basePath = getBasePath(tableName, Option.empty)
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
+        tableName.asInstanceOf[Option[String]])
+      val ttlInstantTime = client.createNewInstantTime()
+      val hoodieTTLMeta = client.managePartitionTTL(ttlInstantTime)
+      if (hoodieTTLMeta == null) {
+        Seq.empty
+      } else {
+        hoodieTTLMeta.getPartitionToReplaceFileIds.keySet().asScala.map { p =>
+          Row(p)
+        }.toSeq
+      }
+    } finally {
+      if (client != null) {
+        client.close()
+      }
+    }
+  }
+}
+
+object RunTTLProcedure {
+  val NAME = "run_ttl"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RunTTLProcedure
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
new file mode 100644
index 00000000000..002375ac462
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.SparkDatasetMixin
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.{TRIP_EXAMPLE_SCHEMA, 
getCommitTimeAtUTC}
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+
+import java.util.Properties
+import scala.collection.JavaConverters._
+
+class TestTTLProcedure extends HoodieSparkProcedureTestBase with 
SparkDatasetMixin {
+
+  test("Test Call run_ttl Procedure by Table") {
+    withSQLConf("hoodie.partition.ttl.automatic" -> "false") {
+      withTempDir { tmp => {
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        initTable(basePath)
+
+        val writeConfig = getConfigBuilder(basePath, tableName, true).build()
+        val client = getHoodieWriteClient(writeConfig)
+        val dataGen = new HoodieTestDataGenerator(0xDEED)
+        val partitionPaths = dataGen.getPartitionPaths()
+        val partitionPath0 = partitionPaths(0)
+        val instant0 = getCommitTimeAtUTC(0)
+
+        writeRecordsForPartition(client, dataGen, partitionPath0, instant0)
+
+        val instant1 = getCommitTimeAtUTC(1000)
+        val partitionPath1 = partitionPaths(1)
+        writeRecordsForPartition(client, dataGen, partitionPath1, instant1)
+
+        val currentInstant = client.createNewInstantTime()
+        val partitionPath2 = partitionPaths(2)
+        writeRecordsForPartition(client, dataGen, partitionPath2, 
currentInstant)
+        spark.sql(
+          s"""
+             | create table $tableName using hudi
+             | location '$basePath'
+             | tblproperties (
+             |   primaryKey = '_row_key',
+             |   preCombineField = '_row_key',
+             |   type = 'cow'
+             | )
+             |""".stripMargin)
+
+        checkAnswer(s"call run_ttl(table => '$tableName', retain_days => 1)")(
+          Seq(partitionPath0),
+          Seq(partitionPath1)
+        )
+      }
+      }
+    }
+  }
+
+  private def writeRecordsForPartition(client: SparkRDDWriteClient[Nothing],
+                                       dataGen: HoodieTestDataGenerator,
+                                       partition: String, instantTime: 
String): Unit = {
+    val records: java.util.List[HoodieRecord[Nothing]] =
+      dataGen.generateInsertsForPartition(instantTime, 10, partition)
+        .asInstanceOf[java.util.List[HoodieRecord[Nothing]]]
+    // Use this JavaRDD to call the insert method
+    client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION)
+    client.insert(spark.sparkContext.parallelize(records.asScala).toJavaRDD(), 
instantTime)
+  }
+
+  private def getHoodieWriteClient(cfg: HoodieWriteConfig): 
SparkRDDWriteClient[Nothing] = {
+    val writeClient = new SparkRDDWriteClient[Nothing](
+      new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), 
cfg
+    )
+    writeClient
+  }
+
+  private def initTable(basePath: String): Unit = {
+    val props = new Properties()
+    props.put("hoodie.datasource.write.partitionpath.field", "partition_path")
+    props.put("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+    props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path")
+    props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key")
+    HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, props);
+  }
+
+  protected def getConfigBuilder(basePath: String, tableName: String, 
autoCommit: Boolean): HoodieWriteConfig.Builder =
+    HoodieWriteConfig
+      .newBuilder
+      .withPath(basePath)
+      .withSchema(TRIP_EXAMPLE_SCHEMA)
+      .withAutoCommit(autoCommit)
+      .withPreCombineField("_row_key")
+      .forTable(tableName)
+
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
new file mode 100644
index 00000000000..3f11621d9d1
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class to run TTL management.
+ */
+public class HoodieTTLJob {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTTLJob.class);
+  private final Config cfg;
+  private final TypedProperties props;
+  private final JavaSparkContext jsc;
+  private HoodieTableMetaClient metaClient;
+
+  public HoodieTTLJob(JavaSparkContext jsc, Config cfg) {
+    this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), 
cfg.propsFilePath, cfg.configs),
+        UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
+  }
+
+  public HoodieTTLJob(JavaSparkContext jsc, Config cfg, TypedProperties props, 
HoodieTableMetaClient metaClient) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = props;
+    this.metaClient = metaClient;
+    LOG.info("Creating TTL job with configs : " + props.toString());
+    // Disable async cleaning, will trigger synchronous cleaning manually.
+    this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
+    if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+      // add default lock config options if MDT is enabled.
+      UtilHelpers.addLockOptions(cfg.basePath, this.props);
+    }
+  }
+
+  public void run() {
+    // need to do commit in SparkDeletePartitionCommitActionExecutor#execute
+    this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    try (SparkRDDWriteClient<HoodieRecordPayload> client =
+             UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", 
cfg.parallelism, Option.empty(), props)) {
+      client.managePartitionTTL(client.createNewInstantTime());
+    }
+  }
+
+  private HoodieWriteConfig getHoodieClientConfig() {
+    return HoodieWriteConfig.newBuilder().combineInput(true, 
true).withPath(cfg.basePath).withAutoCommit(true)
+        .withProps(props).build();
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for 
the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism 
for hoodie insert/upsert/delete", required = false)
+    public int parallelism = 1500;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory 
to use", required = false)
+    public String sparkMemory = null;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client for clustering")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieTTLJob.Config cfg = new HoodieTTLJob.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      throw new HoodieException("Failed to run ttl for " + cfg.basePath);
+    }
+
+    String dirName = new Path(cfg.basePath).getName();
+    JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-ttl-job-" + 
dirName, cfg.sparkMaster);
+
+    try {
+      new HoodieTTLJob(jssc, cfg).run();
+    } catch (Throwable throwable) {
+      throw new HoodieException("Failed to run ttl for " + cfg.basePath, 
throwable);
+    } finally {
+      jssc.stop();
+    }
+
+    LOG.info("Hoodie TTL job ran successfully");
+  }
+
+}

Reply via email to