This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 89d267a4bf4 [HUDI-5823] Partition ttl management (#9723)
89d267a4bf4 is described below
commit 89d267a4bf49692297a1c97b97ea71bfa2f4ee74
Author: stream2000 <[email protected]>
AuthorDate: Wed Feb 28 09:02:32 2024 +0800
[HUDI-5823] Partition ttl management (#9723)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 17 ++
.../org/apache/hudi/config/HoodieTTLConfig.java | 130 ++++++++++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 33 ++++
.../java/org/apache/hudi/table/HoodieTable.java | 8 +
.../HoodiePartitionTTLStrategyFactory.java | 89 ++++++++++
.../ttl/strategy/KeepByCreationTimeStrategy.java | 54 ++++++
.../action/ttl/strategy/KeepByTimeStrategy.java | 110 ++++++++++++
.../action/ttl/strategy/PartitionTTLStrategy.java | 77 +++++++++
.../ttl/strategy/PartitionTTLStrategyType.java | 75 ++++++++
.../table/action/ttl/strategy/TTLStrategy.java | 25 +++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 +
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 5 +
.../apache/hudi/client/SparkRDDWriteClient.java | 8 +
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 12 ++
.../commit/SparkPartitionTTLActionExecutor.java | 59 +++++++
.../hudi/client/TestPartitionTTLManagement.java | 188 +++++++++++++++++++++
.../table/timeline/HoodieInstantTimeGenerator.java | 2 +-
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../hudi/command/procedures/RunTTLProcedure.scala | 99 +++++++++++
.../sql/hudi/procedure/TestTTLProcedure.scala | 117 +++++++++++++
.../org/apache/hudi/utilities/HoodieTTLJob.java | 131 ++++++++++++++
21 files changed, 1244 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 967aaa4f68e..8e89597775d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -484,6 +484,17 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
return false;
}
+ /**
+ * Delete expired partition by config.
+ *
+ * @param instantTime Instant Time for the action
+ * @return HoodieWriteMetadata
+ */
+ public HoodieWriteMetadata<T> managePartitionTTL(String instantTime) {
+ HoodieTable<?, I, ?, T> table = createTable(config,
context.getHadoopConf().get());
+ return table.managePartitionTTL(context, instantTime);
+ }
+
protected abstract void validateClusteringCommit(HoodieWriteMetadata<O>
clusteringMetadata, String clusteringCommitTime, HoodieTable table);
protected abstract HoodieWriteMetadata<O>
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
@@ -583,6 +594,12 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(),
"true");
inlineScheduleClustering(extraMetadata);
}
+
+ // Do an inline partition ttl management if enabled
+ if (config.isInlinePartitionTTLEnable()) {
+ String instantTime = createNewInstantTime();
+ table.managePartitionTTL(table.getContext(), instantTime);
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
new file mode 100644
index 00000000000..1f9a4e40e98
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategyType;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Hoodie Configs for partition/record level ttl management.
+ */
+@Immutable
+@ConfigClassProperty(name = "TTL management Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Data ttl management")
+public class HoodieTTLConfig extends HoodieConfig {
+
+ public static final String PARTITION_TTL_STRATEGY_PARAM_PREFIX =
"hoodie.partition.ttl.strategy.";
+
+ public static final String KEEP_BY_TIME_PARTITION_TTL_STRATEGY =
+ "org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy";
+ public static final ConfigProperty<Boolean> INLINE_PARTITION_TTL =
ConfigProperty
+ .key("hoodie.partition.ttl.inline")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .markAdvanced()
+ .withDocumentation("When enabled, the partition ttl management service
is invoked immediately after each commit, "
+ + "to delete exipired partitions");
+
+ public static final ConfigProperty<String> PARTITION_TTL_STRATEGY_CLASS_NAME
= ConfigProperty
+ .key("hoodie.partition.ttl.strategy.class")
+ .noDefaultValue()
+ .sinceVersion("1.0.0")
+ .markAdvanced()
+ .withDocumentation("Config to provide a strategy class (subclass of
PartitionTTLStrategy) to get the expired partitions");
+
+ public static final ConfigProperty<String> PARTITION_TTL_STRATEGY_TYPE =
ConfigProperty
+ .key("hoodie.partition.ttl.management.strategy.type")
+ .defaultValue(PartitionTTLStrategyType.KEEP_BY_TIME.name())
+ .sinceVersion("1.0.0")
+ .markAdvanced()
+ .withDocumentation("Partition ttl management strategy type to determine
the strategy class");
+
+ public static final ConfigProperty<Integer> DAYS_RETAIN = ConfigProperty
+ .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "days.retain")
+ .defaultValue(-1)
+ .sinceVersion("1.0.0")
+ .markAdvanced()
+ .withDocumentation("Partition ttl management KEEP_BY_TIME strategy days
retain");
+
+ public static final ConfigProperty<String> PARTITION_SELECTED =
ConfigProperty
+ .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "partition.selected")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Partitions to manage ttl");
+
+ public static final ConfigProperty<Integer> MAX_PARTITION_TO_DELETE =
ConfigProperty
+ .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "max.delete.partitions")
+ .defaultValue(1000)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("max partitions to delete in partition ttl
management");
+
+ public static class Builder {
+ private final HoodieTTLConfig ttlConfig = new HoodieTTLConfig();
+
+ public HoodieTTLConfig.Builder withTTLPartitionSelected(String
partitionSelected) {
+ ttlConfig.setValue(PARTITION_SELECTED, partitionSelected);
+ return this;
+ }
+
+ public HoodieTTLConfig.Builder withTTLDaysRetain(Integer daysRetain) {
+ ttlConfig.setValue(DAYS_RETAIN, daysRetain.toString());
+ return this;
+ }
+
+ public HoodieTTLConfig.Builder enableInlinePartitionTTL(Boolean enable) {
+ ttlConfig.setValue(INLINE_PARTITION_TTL, enable.toString());
+ return this;
+ }
+
+ public HoodieTTLConfig.Builder withTTLStrategyClass(String clazz) {
+ ttlConfig.setValue(PARTITION_TTL_STRATEGY_CLASS_NAME, clazz);
+ return this;
+ }
+
+ public HoodieTTLConfig.Builder
withTTLStrategyType(PartitionTTLStrategyType ttlStrategyType) {
+ ttlConfig.setValue(PARTITION_TTL_STRATEGY_TYPE, ttlStrategyType.name());
+ return this;
+ }
+
+ public HoodieTTLConfig.Builder fromProperties(Properties props) {
+ this.ttlConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieTTLConfig build() {
+ ttlConfig.setDefaults(HoodieTTLConfig.class.getName());
+ return ttlConfig;
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8fd3546671e..f4cb386d271 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2697,6 +2697,29 @@ public class HoodieWriteConfig extends HoodieConfig {
return getWriteConcurrencyMode().isNonBlockingConcurrencyControl();
}
+ /**
+ * TTL configs.
+ */
+ public boolean isInlinePartitionTTLEnable() {
+ return getBoolean(HoodieTTLConfig.INLINE_PARTITION_TTL);
+ }
+
+ public String getPartitionTTLStrategyClassName() {
+ return getString(HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME);
+ }
+
+ public Integer getPartitionTTLStrategyDaysRetain() {
+ return getInt(HoodieTTLConfig.DAYS_RETAIN);
+ }
+
+ public String getPartitionTTLPartitionSelected() {
+ return getString(HoodieTTLConfig.PARTITION_SELECTED);
+ }
+
+ public Integer getPartitionTTLMaxPartitionsToDelete() {
+ return getInt(HoodieTTLConfig.MAX_PARTITION_TO_DELETE);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2716,6 +2739,8 @@ public class HoodieWriteConfig extends HoodieConfig {
private boolean isCallbackConfigSet = false;
private boolean isPayloadConfigSet = false;
private boolean isMetadataConfigSet = false;
+
+ private boolean isTTLConfigSet = false;
private boolean isLockConfigSet = false;
private boolean isPreCommitValidationConfigSet = false;
private boolean isMetricsJmxConfigSet = false;
@@ -2995,6 +3020,12 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withTTLConfig(HoodieTTLConfig ttlConfig) {
+ writeConfig.getProps().putAll(ttlConfig.getProps());
+ isTTLConfigSet = true;
+ return this;
+ }
+
public Builder withAutoCommit(boolean autoCommit) {
writeConfig.setValue(AUTO_COMMIT_ENABLE, String.valueOf(autoCommit));
return this;
@@ -3262,6 +3293,8 @@ public class HoodieWriteConfig extends HoodieConfig {
final boolean isLockProviderPropertySet =
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
writeConfig.setDefaultOnCondition(!isLockConfigSet,
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ writeConfig.setDefaultOnCondition(!isTTLConfigSet,
+
HoodieTTLConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 080fe5f357d..3b78fb09090 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -291,6 +291,14 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
*/
public abstract HoodieWriteMetadata<O>
insertOverwriteTable(HoodieEngineContext context, String instantTime, I
records);
+ /**
+ * Delete expired partition by config
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata<O>
managePartitionTTL(HoodieEngineContext context, String instantTime);
+
public HoodieWriteConfig getConfig() {
return config;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
new file mode 100644
index 00000000000..26bcaa9fe51
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import static
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Factory help to create {@link PartitionTTLStrategy}.
+ * <p>
+ * This factory will try {@link
HoodieTTLConfig#PARTITION_TTL_STRATEGY_CLASS_NAME} firstly,
+ * this ensures the class prop will not be overwritten by {@link
PartitionTTLStrategyType}
+ */
+public class HoodiePartitionTTLStrategyFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodiePartitionTTLStrategyFactory.class);
+
+ public static PartitionTTLStrategy createStrategy(HoodieTable hoodieTable,
TypedProperties props, String instantTime) throws IOException {
+ String strategyClassName = getPartitionTTLStrategyClassName(props);
+ try {
+ return (PartitionTTLStrategy)
ReflectionUtils.loadClass(strategyClassName,
+ new Class<?>[] {HoodieTable.class, String.class}, hoodieTable,
instantTime);
+ } catch (Throwable e) {
+ throw new IOException("Could not load partition ttl management strategy
class " + strategyClassName, e);
+ }
+ }
+
+ private static String getPartitionTTLStrategyClassName(TypedProperties
props) {
+ String strategyClassName =
+ props.getString(PARTITION_TTL_STRATEGY_CLASS_NAME.key(), null);
+ if (StringUtils.isNullOrEmpty(strategyClassName)) {
+ String strategyType = props.getString(PARTITION_TTL_STRATEGY_TYPE.key(),
+ PARTITION_TTL_STRATEGY_TYPE.defaultValue());
+ PartitionTTLStrategyType strategyTypeEnum;
+ try {
+ strategyTypeEnum =
PartitionTTLStrategyType.valueOf(strategyType.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new HoodieException("Unsupported PartitionTTLStrategy Type " +
strategyType);
+ }
+ strategyClassName = getPartitionTTLStrategyFromType(strategyTypeEnum);
+ }
+ return strategyClassName;
+ }
+
+ /**
+ * @param type {@link PartitionTTLStrategyType} enum.
+ * @return The partition ttl management strategy class name based on the
{@link PartitionTTLStrategyType}.
+ */
+ public static String
getPartitionTTLStrategyFromType(PartitionTTLStrategyType type) {
+ switch (type) {
+ case KEEP_BY_TIME:
+ return KeepByTimeStrategy.class.getName();
+ case KEEP_BY_CREATION_TIME:
+ return KeepByCreationTimeStrategy.class.getName();
+ default:
+ throw new HoodieException("Unsupported PartitionTTLStrategy Type " +
type);
+ }
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
new file mode 100644
index 00000000000..a350086f2dc
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByCreationTimeStrategy extends KeepByTimeStrategy {
+
+ public KeepByCreationTimeStrategy(HoodieTable hoodieTable, String
instantTime) {
+ super(hoodieTable, instantTime);
+ }
+
+ @Override
+ protected List<String> getExpiredPartitionsForTimeStrategy(List<String>
partitionPathsForTTL) {
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ return partitionPathsForTTL.stream().parallel().filter(part -> {
+ HoodiePartitionMetadata hoodiePartitionMetadata =
+ new HoodiePartitionMetadata(metaClient.getFs(),
FSUtils.getPartitionPath(metaClient.getBasePath(), part));
+ Option<String> instantOption =
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ if (instantOption.isPresent()) {
+ String instantTime = instantOption.get();
+ return isPartitionExpired(instantTime);
+ }
+ return false;
+ }).collect(Collectors.toList());
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
new file mode 100644
index 00000000000..b6d67bb9e8a
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.fixInstantTimeCompatibility;
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimePlusMillis;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByTimeStrategy extends PartitionTTLStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KeepByTimeStrategy.class);
+
+ protected final long ttlInMilis;
+
+ public KeepByTimeStrategy(HoodieTable hoodieTable, String instantTime) {
+ super(hoodieTable, instantTime);
+ this.ttlInMilis = writeConfig.getPartitionTTLStrategyDaysRetain() * 1000 *
3600 * 24;
+ }
+
+ @Override
+ public List<String> getExpiredPartitionPaths() {
+ Option<HoodieInstant> lastCompletedInstant =
hoodieTable.getActiveTimeline().filterCompletedInstants().lastInstant();
+ if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0
+ ||
!hoodieTable.getMetaClient().getTableConfig().getPartitionFields().isPresent())
{
+ return Collections.emptyList();
+ }
+ List<String> expiredPartitions =
getExpiredPartitionsForTimeStrategy(getPartitionPathsForTTL());
+ int limit = writeConfig.getPartitionTTLMaxPartitionsToDelete();
+ LOG.info("Total expired partitions count {}, limit {}",
expiredPartitions.size(), limit);
+ return expiredPartitions.stream()
+ .limit(limit) // Avoid a single replace commit too large
+ .collect(Collectors.toList());
+ }
+
+ protected List<String> getExpiredPartitionsForTimeStrategy(List<String>
partitionsForTTLManagement) {
+ HoodieTimer timer = HoodieTimer.start();
+ Map<String, Option<String>> lastCommitTimeForPartitions =
getLastCommitTimeForPartitions(partitionsForTTLManagement);
+ LOG.info("Collect last commit time for partitions cost {} ms",
timer.endTimer());
+ return lastCommitTimeForPartitions.entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isPresent())
+ .filter(entry -> isPartitionExpired(entry.getValue().get()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param partitionPaths Partitions to collect stats.
+ */
+ private Map<String, Option<String>>
getLastCommitTimeForPartitions(List<String> partitionPaths) {
+ int statsParallelism = Math.min(partitionPaths.size(), 200);
+ return hoodieTable.getContext().map(partitionPaths, partitionPath -> {
+ Option<String> partitionLastModifiedTime = hoodieTable.getHoodieView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath, instantTime, true)
+ .map(FileSlice::getBaseInstantTime)
+ .max(Comparator.naturalOrder())
+ .map(Option::ofNullable)
+ .orElse(Option.empty());
+ return Pair.of(partitionPath, partitionLastModifiedTime);
+ }, statsParallelism).stream().collect(Collectors.toMap(Pair::getKey,
Pair::getValue));
+ }
+
+ /**
+ * Determines if a partition's reference time has exceeded its time-to-live
(TTL).
+ * <p>
+ * This method checks if the current time has passed the TTL threshold based
on a
+ * reference time, which could be the creation time or the last commit time
of the partition.
+ *
+ * @param referenceTime last commit time or creation time for partition
+ */
+ protected boolean isPartitionExpired(String referenceTime) {
+ String expiredTime =
instantTimePlusMillis(fixInstantTimeCompatibility(referenceTime), ttlInMilis);
+ return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0;
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
new file mode 100644
index 00000000000..47768870930
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Strategy for partition-level ttl management.
+ */
+public abstract class PartitionTTLStrategy implements TTLStrategy,
Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionTTLStrategy.class);
+
+ protected final HoodieTable hoodieTable;
+ protected final HoodieWriteConfig writeConfig;
+ protected final String instantTime;
+
+ public PartitionTTLStrategy(HoodieTable hoodieTable, String instantTime) {
+ this.writeConfig = hoodieTable.getConfig();
+ this.hoodieTable = hoodieTable;
+ this.instantTime = instantTime;
+ }
+
+ /**
+ * Get expired partition paths for a specific partition ttl strategy.
+ *
+ * @return Expired partition paths.
+ */
+ public abstract List<String> getExpiredPartitionPaths();
+
+ /**
+ * Scan and list all partitions for partition ttl management.
+ *
+ * @return all partitions paths for the dataset.
+ */
+ protected List<String> getPartitionPathsForTTL() {
+ String partitionSelected = writeConfig.getClusteringPartitionSelected();
+ HoodieTimer timer = HoodieTimer.start();
+ List<String> partitionsForTTL;
+ if (StringUtils.isNullOrEmpty(partitionSelected)) {
+ // Return All partition paths
+ partitionsForTTL =
FSUtils.getAllPartitionPaths(hoodieTable.getContext(),
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+ } else {
+ partitionsForTTL = Arrays.asList(partitionSelected.split(","));
+ }
+ LOG.info("Get partitions for ttl cost {} ms", timer.endTimer());
+ return partitionsForTTL;
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
new file mode 100644
index 00000000000..6dfbcd6d0e5
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static
org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Types of {@link PartitionTTLStrategy}.
+ */
+public enum PartitionTTLStrategyType {
+ KEEP_BY_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy"),
+
KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy");
+
+ private final String className;
+
+ PartitionTTLStrategyType(String className) {
+ this.className = className;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public static PartitionTTLStrategyType fromClassName(String className) {
+ for (PartitionTTLStrategyType type : PartitionTTLStrategyType.values()) {
+ if (type.getClassName().equals(className)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("No PartitionTTLStrategyType found for
class name: " + className);
+ }
+
+ public static List<String> getPartitionTTLStrategyNames() {
+ List<String> names = new
ArrayList<>(PartitionTTLStrategyType.values().length);
+ Arrays.stream(PartitionTTLStrategyType.values())
+ .forEach(x -> names.add(x.name()));
+ return names;
+ }
+
+ @Nullable
+ public static String getPartitionTTLStrategyClassName(HoodieConfig config) {
+ if (config.contains(PARTITION_TTL_STRATEGY_CLASS_NAME)) {
+ return config.getString(PARTITION_TTL_STRATEGY_CLASS_NAME);
+ } else if (config.contains(PARTITION_TTL_STRATEGY_TYPE)) {
+ return
KeyGeneratorType.valueOf(config.getString(PARTITION_TTL_STRATEGY_TYPE)).getClassName();
+ }
+ return null;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
new file mode 100644
index 00000000000..ad41f95fba2
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+/**
+ * Strategy for ttl management.
+ */
+public interface TTLStrategy {
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 0f73b0bce05..9ca7e158a22 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -397,6 +397,11 @@ public class HoodieFlinkCopyOnWriteTable<T>
throw new HoodieNotSupportedException("Restore is not supported yet");
}
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+ throw new HoodieNotSupportedException("Manage partition ttl is not
supported yet");
+ }
+
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String
restoreInstantTimestamp, String savepointToRestoreTimestamp) {
throw new HoodieNotSupportedException("Savepoint and restore is not
supported yet");
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 4c080f2f663..13d12e46d7f 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -178,6 +178,11 @@ public class HoodieJavaCopyOnWriteTable<T>
context, config, this, instantTime, records).execute();
}
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+ throw new HoodieNotSupportedException("Manage partition ttl is not
supported yet");
+ }
+
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context,
String instantTime,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 6fdfee16bbe..0308649dbf6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -265,6 +265,14 @@ public class SparkRDDWriteClient<T> extends
return new HoodieWriteResult(postWrite(resultRDD, instantTime, table),
result.getPartitionToReplaceFileIds());
}
+ public HoodieWriteResult managePartitionTTL(String instantTime) {
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION,
Option.ofNullable(instantTime));
+ preWrite(instantTime, WriteOperationType.DELETE_PARTITION,
table.getMetaClient());
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.managePartitionTTL(context, instantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return new HoodieWriteResult(postWrite(resultRDD, instantTime, table),
result.getPartitionToReplaceFileIds());
+ }
+
@Override
protected void initMetadataTable(Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index e9d21350c21..04c12bbf850 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -71,6 +71,7 @@ import
org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.SparkPartitionTTLActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.index.RunIndexActionExecutor;
@@ -226,6 +227,16 @@ public class HoodieSparkCopyOnWriteTable<T>
shouldRollbackUsingMarkers, isRestore).execute();
}
+ /**
+ * Delete expired partition by config
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @return HoodieWriteMetadata
+ */
+ public HoodieWriteMetadata<HoodieData<WriteStatus>>
managePartitionTTL(HoodieEngineContext context, String instantTime) {
+ return new SparkPartitionTTLActionExecutor<>(context, config, this,
instantTime).execute();
+ }
+
@Override
public Iterator<List<WriteStatus>> handleUpdate(
String instantTime, String partitionPath, String fileId,
@@ -322,4 +333,5 @@ public class HoodieSparkCopyOnWriteTable<T>
public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext
context, String restoreInstantTimestamp, String savepointToRestoreTimestamp) {
return new RestorePlanActionExecutor<>(context, config, this,
restoreInstantTimestamp, savepointToRestoreTimestamp).execute();
}
+
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
new file mode 100644
index 00000000000..166fc3672fa
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SparkPartitionTTLActionExecutor<T>
+ extends BaseSparkCommitActionExecutor<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsistentBucketBulkInsertDataInternalWriterHelper.class);
+
+ public SparkPartitionTTLActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
+ String instantTime) {
+ super(context, config, table, instantTime,
WriteOperationType.DELETE_PARTITION);
+ }
+
+ @Override
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ try {
+ PartitionTTLStrategy strategy =
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(),
instantTime);
+ List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+ LOG.info("Partition ttl find the following expired partitions to delete:
" + String.join(",", expiredPartitions));
+ return new SparkDeletePartitionCommitActionExecutor<>(context, config,
table, instantTime, expiredPartitions).execute();
+ } catch (IOException e) {
+ throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
new file mode 100644
index 00000000000..46677c9aaa7
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategyType;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+
+import com.github.davidmoten.guavamini.Sets;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+
+/**
+ * Test Cases for partition ttl management.
+ */
+public class TestPartitionTTLManagement extends HoodieClientTestBase {
+
+ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withAutoCommit(autoCommit)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024 * 1024)
+
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 *
1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
+ .forTable("test-trip-table")
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ }
+
+ @Test
+ public void testKeepByCreationTime() {
+ final HoodieWriteConfig cfg = getConfigBuilder(true)
+ .withPath(metaClient.getBasePathV2().toString())
+ .withTTLConfig(HoodieTTLConfig
+ .newBuilder()
+ .withTTLDaysRetain(10)
+
.withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_CREATION_TIME)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+ .build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String partitionPath0 = dataGen.getPartitionPaths()[0];
+ String instant0 = getCommitTimeAtUTC(0);
+ writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+ String instant1 = getCommitTimeAtUTC(1000);
+ String partitionPath1 = dataGen.getPartitionPaths()[1];
+ writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+ String currentInstant = client.createNewInstantTime();
+ String partitionPath2 = dataGen.getPartitionPaths()[2];
+ writeRecordsForPartition(client, dataGen, partitionPath2,
currentInstant);
+
+ HoodieWriteResult result =
client.managePartitionTTL(client.createNewInstantTime());
+
+ Assertions.assertEquals(Sets.newHashSet(partitionPath0, partitionPath1),
result.getPartitionToReplaceFileIds().keySet());
+ Assertions.assertEquals(10, readRecords(new String[] {partitionPath0,
partitionPath1, partitionPath2}).size());
+ }
+ }
+
+ @Test
+ public void testKeepByTime() {
+ final HoodieWriteConfig cfg = getConfigBuilder(true)
+ .withPath(metaClient.getBasePathV2().toString())
+ .withTTLConfig(HoodieTTLConfig
+ .newBuilder()
+ .withTTLDaysRetain(10)
+ .withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_TIME)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+ .build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String partitionPath0 = dataGen.getPartitionPaths()[0];
+ String instant0 = getCommitTimeAtUTC(0);
+ writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+ String instant1 = getCommitTimeAtUTC(1000);
+ String partitionPath1 = dataGen.getPartitionPaths()[1];
+ writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+ String currentInstant = client.createNewInstantTime();
+ String partitionPath2 = dataGen.getPartitionPaths()[2];
+ writeRecordsForPartition(client, dataGen, partitionPath2,
currentInstant);
+
+ HoodieWriteResult result =
client.managePartitionTTL(client.createNewInstantTime());
+
+ Assertions.assertEquals(Sets.newHashSet(partitionPath0, partitionPath1),
result.getPartitionToReplaceFileIds().keySet());
+
+ // remain 10 rows
+ Assertions.assertEquals(10, readRecords(new String[] {partitionPath0,
partitionPath1, partitionPath2}).size());
+ }
+ }
+
+ @Test
+ public void testInlinePartitionTTL() {
+ final HoodieWriteConfig cfg = getConfigBuilder(true)
+ .withPath(metaClient.getBasePathV2().toString())
+ .withTTLConfig(HoodieTTLConfig
+ .newBuilder()
+ .withTTLDaysRetain(10)
+ .withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_TIME)
+ .enableInlinePartitionTTL(true)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
+ .build();
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String partitionPath0 = dataGen.getPartitionPaths()[0];
+ String instant0 = getCommitTimeAtUTC(0);
+ writeRecordsForPartition(client, dataGen, partitionPath0, instant0);
+
+ // All records will be deleted
+ Assertions.assertEquals(0, readRecords(new String[]
{partitionPath0}).size());
+
+ String instant1 = getCommitTimeAtUTC(1000);
+ String partitionPath1 = dataGen.getPartitionPaths()[1];
+ writeRecordsForPartition(client, dataGen, partitionPath1, instant1);
+
+ // All records will be deleted
+ Assertions.assertEquals(0, readRecords(new String[]
{partitionPath1}).size());
+
+ String currentInstant = client.createNewInstantTime();
+ String partitionPath2 = dataGen.getPartitionPaths()[2];
+ writeRecordsForPartition(client, dataGen, partitionPath2,
currentInstant);
+
+ // remain 10 rows
+ Assertions.assertEquals(10, readRecords(new String[]
{partitionPath2}).size());
+ }
+ }
+
+ private void writeRecordsForPartition(SparkRDDWriteClient client,
HoodieTestDataGenerator dataGen, String partition, String instantTime) {
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(instantTime, 10, partition);
+ client.startCommitWithTime(instantTime);
+ JavaRDD writeStatuses = client.insert(jsc.parallelize(records, 1),
instantTime);
+ client.commit(instantTime, writeStatuses);
+ }
+
+ private List<GenericRecord> readRecords(String[] partitions) {
+ return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
+ Arrays.stream(partitions).map(p -> Paths.get(basePath,
p).toString()).collect(Collectors.toList()),
+ basePath, new JobConf(hadoopConf), true, false);
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index a4f4b2cdf24..efa9c6f120a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -120,7 +120,7 @@ public class HoodieInstantTimeGenerator {
}
}
- private static String fixInstantTimeCompatibility(String instantTime) {
+ public static String fixInstantTimeCompatibility(String instantTime) {
// Enables backwards compatibility with non-millisecond granularity
instants
if (isSecondGranularity(instantTime)) {
// Add milliseconds to the instant in order to parse successfully
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 9bb6fb6db8d..e12aad789d7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -91,6 +91,7 @@ object HoodieProcedures {
,(ShowTablePropertiesProcedure.NAME,
ShowTablePropertiesProcedure.builder)
,(HelpProcedure.NAME, HelpProcedure.builder)
,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
+ ,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
new file mode 100644
index 00000000000..2d3e704ad12
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunTTLProcedure.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.config.HoodieTTLConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
+class RunTTLProcedure extends BaseProcedure with ProcedureBuilder with Logging
{
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "ttl_policy", DataTypes.StringType),
+ ProcedureParameter.optional(2, "retain_days", DataTypes.IntegerType),
+ ProcedureParameter.optional(3, "options", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("deleted_partitions", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ override def build: Procedure = new RunTTLProcedure
+
+ /**
+ * Returns the input parameters of this procedure.
+ */
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ /**
+ * Returns the type of rows produced by this procedure.
+ */
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ var confs: Map[String, String] = Map.empty
+ if (getArgValueOrDefault(args, PARAMETERS(1)).isDefined) {
+ confs += HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE.key() ->
getArgValueOrDefault(args, PARAMETERS(1)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(2)).isDefined) {
+ confs += HoodieTTLConfig.DAYS_RETAIN.key() -> getArgValueOrDefault(args,
PARAMETERS(2)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) {
+ confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String])
+ }
+
+ val basePath = getBasePath(tableName, Option.empty)
+
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
+ tableName.asInstanceOf[Option[String]])
+ val ttlInstantTime = client.createNewInstantTime()
+ val hoodieTTLMeta = client.managePartitionTTL(ttlInstantTime)
+ if (hoodieTTLMeta == null) {
+ Seq.empty
+ } else {
+ hoodieTTLMeta.getPartitionToReplaceFileIds.keySet().asScala.map { p =>
+ Row(p)
+ }.toSeq
+ }
+ } finally {
+ if (client != null) {
+ client.close()
+ }
+ }
+ }
+}
+
+object RunTTLProcedure {
+ val NAME = "run_ttl"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new RunTTLProcedure
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
new file mode 100644
index 00000000000..002375ac462
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.SparkDatasetMixin
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.{TRIP_EXAMPLE_SCHEMA,
getCommitTimeAtUTC}
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+
+import java.util.Properties
+import scala.collection.JavaConverters._
+
+class TestTTLProcedure extends HoodieSparkProcedureTestBase with
SparkDatasetMixin {
+
+ test("Test Call run_ttl Procedure by Table") {
+ withSQLConf("hoodie.partition.ttl.automatic" -> "false") {
+ withTempDir { tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ initTable(basePath)
+
+ val writeConfig = getConfigBuilder(basePath, tableName, true).build()
+ val client = getHoodieWriteClient(writeConfig)
+ val dataGen = new HoodieTestDataGenerator(0xDEED)
+ val partitionPaths = dataGen.getPartitionPaths()
+ val partitionPath0 = partitionPaths(0)
+ val instant0 = getCommitTimeAtUTC(0)
+
+ writeRecordsForPartition(client, dataGen, partitionPath0, instant0)
+
+ val instant1 = getCommitTimeAtUTC(1000)
+ val partitionPath1 = partitionPaths(1)
+ writeRecordsForPartition(client, dataGen, partitionPath1, instant1)
+
+ val currentInstant = client.createNewInstantTime()
+ val partitionPath2 = partitionPaths(2)
+ writeRecordsForPartition(client, dataGen, partitionPath2,
currentInstant)
+ spark.sql(
+ s"""
+ | create table $tableName using hudi
+ | location '$basePath'
+ | tblproperties (
+ | primaryKey = '_row_key',
+ | preCombineField = '_row_key',
+ | type = 'cow'
+ | )
+ |""".stripMargin)
+
+ checkAnswer(s"call run_ttl(table => '$tableName', retain_days => 1)")(
+ Seq(partitionPath0),
+ Seq(partitionPath1)
+ )
+ }
+ }
+ }
+ }
+
+ private def writeRecordsForPartition(client: SparkRDDWriteClient[Nothing],
+ dataGen: HoodieTestDataGenerator,
+ partition: String, instantTime:
String): Unit = {
+ val records: java.util.List[HoodieRecord[Nothing]] =
+ dataGen.generateInsertsForPartition(instantTime, 10, partition)
+ .asInstanceOf[java.util.List[HoodieRecord[Nothing]]]
+ // Use this JavaRDD to call the insert method
+ client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION)
+ client.insert(spark.sparkContext.parallelize(records.asScala).toJavaRDD(),
instantTime)
+ }
+
+ private def getHoodieWriteClient(cfg: HoodieWriteConfig):
SparkRDDWriteClient[Nothing] = {
+ val writeClient = new SparkRDDWriteClient[Nothing](
+ new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
cfg
+ )
+ writeClient
+ }
+
+ private def initTable(basePath: String): Unit = {
+ val props = new Properties()
+ props.put("hoodie.datasource.write.partitionpath.field", "partition_path")
+ props.put("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.SimpleKeyGenerator")
+ props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path")
+ props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key")
+ HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, props);
+ }
+
+ protected def getConfigBuilder(basePath: String, tableName: String,
autoCommit: Boolean): HoodieWriteConfig.Builder =
+ HoodieWriteConfig
+ .newBuilder
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withAutoCommit(autoCommit)
+ .withPreCombineField("_row_key")
+ .forTable(tableName)
+
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
new file mode 100644
index 00000000000..3f11621d9d1
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class to run TTL management.
+ */
+public class HoodieTTLJob {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieTTLJob.class);
+ private final Config cfg;
+ private final TypedProperties props;
+ private final JavaSparkContext jsc;
+ private HoodieTableMetaClient metaClient;
+
+ public HoodieTTLJob(JavaSparkContext jsc, Config cfg) {
+ this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(),
cfg.propsFilePath, cfg.configs),
+ UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
+ }
+
+ public HoodieTTLJob(JavaSparkContext jsc, Config cfg, TypedProperties props,
HoodieTableMetaClient metaClient) {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.props = props;
+ this.metaClient = metaClient;
+ LOG.info("Creating TTL job with configs : " + props.toString());
+ // Disable async cleaning, will trigger synchronous cleaning manually.
+ this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
+ if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+ // add default lock config options if MDT is enabled.
+ UtilHelpers.addLockOptions(cfg.basePath, this.props);
+ }
+ }
+
+ public void run() {
+ // need to do commit in SparkDeletePartitionCommitActionExecutor#execute
+ this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
+ UtilHelpers.createHoodieClient(jsc, cfg.basePath, "",
cfg.parallelism, Option.empty(), props)) {
+ client.managePartitionTTL(client.createNewInstantTime());
+ }
+ }
+
+ private HoodieWriteConfig getHoodieClientConfig() {
+ return HoodieWriteConfig.newBuilder().combineInput(true,
true).withPath(cfg.basePath).withAutoCommit(true)
+ .withProps(props).build();
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert/upsert/delete", required = false)
+ public int parallelism = 1500;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = false)
+ public String sparkMemory = null;
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
+ + "hoodie client for clustering")
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ }
+
+ public static void main(String[] args) {
+ final HoodieTTLJob.Config cfg = new HoodieTTLJob.Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ throw new HoodieException("Failed to run ttl for " + cfg.basePath);
+ }
+
+ String dirName = new Path(cfg.basePath).getName();
+ JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-ttl-job-" +
dirName, cfg.sparkMaster);
+
+ try {
+ new HoodieTTLJob(jssc, cfg).run();
+ } catch (Throwable throwable) {
+ throw new HoodieException("Failed to run ttl for " + cfg.basePath,
throwable);
+ } finally {
+ jssc.stop();
+ }
+
+ LOG.info("Hoodie TTL job ran successfully");
+ }
+
+}