This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95a0df398c7 [HUDI-7397] Adding support to purge pending clustering
instant (#10645)
95a0df398c7 is described below
commit 95a0df398c75c81b2f61dccab84f313013e4a44d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Feb 10 22:16:18 2024 -0800
[HUDI-7397] Adding support to purge pending clustering instant (#10645)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 12 +++
.../apache/hudi/client/BaseHoodieWriteClient.java | 6 ++
.../java/org/apache/hudi/table/HoodieTable.java | 15 +++
.../apache/hudi/utilities/HoodieClusteringJob.java | 14 +++
.../org/apache/hudi/utilities/UtilHelpers.java | 1 +
.../offlinejob/TestHoodieClusteringJob.java | 109 +++++++++++++++++----
6 files changed, 139 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 5b54151dc4c..967aaa4f68e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -472,6 +472,18 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
return clusteringMetadata;
}
+ public boolean purgePendingClustering(String clusteringInstant) {
+ HoodieTable<?, I, ?, T> table = createTable(config,
context.getHadoopConf().get());
+ HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
+ HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
+ if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true);
+ table.getMetaClient().reloadActiveTimeline();
+ return true;
+ }
+ return false;
+ }
+
protected abstract void validateClusteringCommit(HoodieWriteMetadata<O>
clusteringMetadata, String clusteringCommitTime, HoodieTable table);
protected abstract HoodieWriteMetadata<O>
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 6bed4a6fe6b..9b69d819e71 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1198,6 +1198,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}
+ public boolean purgePendingClustering(String clusteringInstant) {
+ HoodieTable table = createTable(config, context.getHadoopConf().get());
+ preWrite(clusteringInstant, WriteOperationType.CLUSTER,
table.getMetaClient());
+ return tableServiceClient.purgePendingClustering(clusteringInstant);
+ }
+
/**
* Schedule table services such as clustering, compaction & cleaning.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index c617402d577..080fe5f357d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -627,8 +627,23 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
*/
public void rollbackInflightClustering(HoodieInstant inflightInstant,
Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+ rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc,
false);
+ }
+
+ /**
+ * Rollback inflight clustering instant to requested clustering instant
+ *
+ * @param inflightInstant Inflight clustering instant
+ * @param getPendingRollbackInstantFunc Function to get rollback instant
+ */
+ public void rollbackInflightClustering(HoodieInstant inflightInstant,
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean
deleteInstants) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+ if (deleteInstants) {
+ // above rollback would still keep requested in the timeline. so, lets
delete it if if are looking to purge the pending clustering fully.
+ getActiveTimeline().deletePending(new
HoodieInstant(HoodieInstant.State.REQUESTED, inflightInstant.getAction(),
inflightInstant.getTimestamp()));
+ }
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index ef7de13b34f..cb267856ef0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -45,6 +45,7 @@ import java.util.Date;
import java.util.List;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
@@ -192,6 +193,10 @@ public class HoodieClusteringJob {
LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
return doCluster(jsc);
}
+ case PURGE_PENDING_INSTANT: {
+ LOG.info("Running Mode: [" + PURGE_PENDING_INSTANT + "];");
+ return doPurgePendingInstant(jsc);
+ }
default: {
LOG.error("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
return -1;
@@ -282,6 +287,15 @@ public class HoodieClusteringJob {
}
}
+ private int doPurgePendingInstant(JavaSparkContext jsc) throws Exception {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+ client.purgePendingClustering(cfg.clusteringInstantTime);
+ }
+ return 0;
+ }
+
private void clean(SparkRDDWriteClient<?> client) {
if (client.getConfig().isAutoClean()) {
client.clean();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index c5c01e5856f..18e92a8463c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -122,6 +122,7 @@ public class UtilHelpers {
public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
+ public static final String PURGE_PENDING_INSTANT = "purge_pending_instant";
private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index f12430dc266..b46a790f328 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -25,42 +25,34 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.Properties;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
+import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.deleteFileFromDfs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link HoodieClusteringJob}.
*/
public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase {
+
@Test
public void testHoodieClusteringJobWithClean() throws Exception {
String tableBasePath = basePath + "/asyncClustering";
Properties props = getPropertiesForKeyGen(true);
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
- .forTable("asyncClustering")
- .withPath(tableBasePath)
- .withSchema(TRIP_EXAMPLE_SCHEMA)
- .withParallelism(2, 2)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
- .withAutoCommit(false)
- .withClusteringConfig(HoodieClusteringConfig.newBuilder()
- .withInlineClustering(false)
- .withScheduleInlineClustering(false)
- .withAsyncClustering(false).build())
- .withStorageConfig(HoodieStorageConfig.newBuilder()
- .logFileMaxSize(1024).build())
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
- .withAutoClean(false).withAsyncClean(false).build())
- .build();
+ HoodieWriteConfig config = getWriteConfig(tableBasePath);
props.putAll(config.getProps());
Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
@@ -68,7 +60,7 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
.fromProperties(props)
.build();
- metaClient =
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(),
tableBasePath, metaClientProps);
+ metaClient =
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(),
tableBasePath, metaClientProps);
client = new SparkRDDWriteClient(context, config);
writeData(false, client.createNewInstantTime(), 100, true);
@@ -92,6 +84,58 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath,
fs);
}
+ @Test
+ public void testPurgePendingInstants() throws Exception {
+ String tableBasePath = basePath + "/purgePendingClustering";
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config = getWriteConfig(tableBasePath);
+ props.putAll(config.getProps());
+ Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .fromProperties(props)
+ .build();
+
+ metaClient =
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(),
tableBasePath, metaClientProps);
+ client = new SparkRDDWriteClient(context, config);
+
+ writeData(false, client.createNewInstantTime(), 100, true);
+ writeData(false, client.createNewInstantTime(), 100, true);
+
+ // offline clustering execute without clean
+ HoodieClusteringJob hoodieCluster =
+ init(tableBasePath, true, "scheduleAndExecute", false);
+ hoodieCluster.cluster(0);
+ HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1,
tableBasePath, fs);
+ HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath,
fs);
+
+ // remove the completed instant from timeline and trigger purge of pending
clustering instant.
+ HoodieInstant latestClusteringInstant = metaClient.getActiveTimeline()
+
.filterCompletedInstantsOrRewriteTimeline().getCompletedReplaceTimeline().getInstants().get(0);
+ String completedFilePath = tableBasePath + "/" + METAFOLDER_NAME + "/" +
latestClusteringInstant.getFileName();
+ deleteFileFromDfs(fs, completedFilePath);
+
+ // trigger purge.
+ hoodieCluster =
+ getClusteringConfigForPurge(tableBasePath, true,
PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp());
+ hoodieCluster.cluster(0);
+ // validate that there are no clustering commits in timeline.
+ HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0,
tableBasePath, fs);
+
+ // validate that no records match the clustering instant.
+ String[] fullPartitionPaths = new
String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", tableBasePath,
dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals(0, HoodieClientTestUtils.read(jsc, tableBasePath, sqlContext,
fs, fullPartitionPaths).filter("_hoodie_commit_time = " +
latestClusteringInstant.getTimestamp()).count(),
+ "Must not contain any records w/ clustering instant time");
+ }
+
+ private void deleteCommitMetaFile(String instantTime, String suffix) throws
IOException {
+ String targetPath = basePath + "/" + METAFOLDER_NAME + "/" + instantTime +
suffix;
+ deleteFileFromDfs(fs, targetPath);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -102,6 +146,14 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
return new HoodieClusteringJob(jsc, clusterConfig);
}
+ private HoodieClusteringJob getClusteringConfigForPurge(String
tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean
isAutoClean,
+ String
pendingInstant) {
+ HoodieClusteringJob.Config clusterConfig =
buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute,
isAutoClean);
+ clusterConfig.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
+ clusterConfig.clusteringInstantTime = pendingInstant;
+ return new HoodieClusteringJob(jsc, clusterConfig);
+ }
+
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String
basePath, boolean runSchedule, String runningMode, boolean isAutoClean) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
@@ -113,4 +165,25 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
config.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1));
return config;
}
+
+ private HoodieWriteConfig getWriteConfig(String tableBasePath) {
+ return HoodieWriteConfig.newBuilder()
+ .forTable("asyncClustering")
+ .withPath(tableBasePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .withAutoCommit(false)
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(false)
+ .withScheduleInlineClustering(false)
+ .withAsyncClustering(false).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .logFileMaxSize(1024).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .withAutoClean(false).withAsyncClean(false).build())
+ .build();
+ }
+
}