This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 95a0df398c7 [HUDI-7397] Adding support to purge pending clustering 
instant (#10645)
95a0df398c7 is described below

commit 95a0df398c75c81b2f61dccab84f313013e4a44d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Feb 10 22:16:18 2024 -0800

    [HUDI-7397] Adding support to purge pending clustering instant (#10645)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  12 +++
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   6 ++
 .../java/org/apache/hudi/table/HoodieTable.java    |  15 +++
 .../apache/hudi/utilities/HoodieClusteringJob.java |  14 +++
 .../org/apache/hudi/utilities/UtilHelpers.java     |   1 +
 .../offlinejob/TestHoodieClusteringJob.java        | 109 +++++++++++++++++----
 6 files changed, 139 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 5b54151dc4c..967aaa4f68e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -472,6 +472,18 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     return clusteringMetadata;
   }
 
+  public boolean purgePendingClustering(String clusteringInstant) {
+    HoodieTable<?, I, ?, T> table = createTable(config, 
context.getHadoopConf().get());
+    HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
+    HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
+    if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
+      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true);
+      table.getMetaClient().reloadActiveTimeline();
+      return true;
+    }
+    return false;
+  }
+
   protected abstract void validateClusteringCommit(HoodieWriteMetadata<O> 
clusteringMetadata, String clusteringCommitTime, HoodieTable table);
 
   protected abstract HoodieWriteMetadata<O> 
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 6bed4a6fe6b..9b69d819e71 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1198,6 +1198,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     return tableServiceClient.cluster(clusteringInstant, shouldComplete);
   }
 
+  public boolean purgePendingClustering(String clusteringInstant) {
+    HoodieTable table = createTable(config, context.getHadoopConf().get());
+    preWrite(clusteringInstant, WriteOperationType.CLUSTER, 
table.getMetaClient());
+    return tableServiceClient.purgePendingClustering(clusteringInstant);
+  }
+
   /**
    * Schedule table services such as clustering, compaction & cleaning.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index c617402d577..080fe5f357d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -627,8 +627,23 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    */
   public void rollbackInflightClustering(HoodieInstant inflightInstant,
                                          Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+    rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc, 
false);
+  }
+
+  /**
+   * Rollback inflight clustering instant to requested clustering instant
+   *
+   * @param inflightInstant               Inflight clustering instant
+   * @param getPendingRollbackInstantFunc Function to get rollback instant
+   */
+  public void rollbackInflightClustering(HoodieInstant inflightInstant,
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean 
deleteInstants) {
     
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
     rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+    if (deleteInstants) {
+      // above rollback would still keep requested in the timeline. so, lets 
delete it if if are looking to purge the pending clustering fully.
+      getActiveTimeline().deletePending(new 
HoodieInstant(HoodieInstant.State.REQUESTED, inflightInstant.getAction(), 
inflightInstant.getTimestamp()));
+    }
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index ef7de13b34f..cb267856ef0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -45,6 +45,7 @@ import java.util.Date;
 import java.util.List;
 
 import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
 
@@ -192,6 +193,10 @@ public class HoodieClusteringJob {
           LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
           return doCluster(jsc);
         }
+        case PURGE_PENDING_INSTANT: {
+          LOG.info("Running Mode: [" + PURGE_PENDING_INSTANT + "];");
+          return doPurgePendingInstant(jsc);
+        }
         default: {
           LOG.error("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
           return -1;
@@ -282,6 +287,15 @@ public class HoodieClusteringJob {
     }
   }
 
+  private int doPurgePendingInstant(JavaSparkContext jsc) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
+      client.purgePendingClustering(cfg.clusteringInstantTime);
+    }
+    return 0;
+  }
+
   private void clean(SparkRDDWriteClient<?> client) {
     if (client.getConfig().isAutoClean()) {
       client.clean();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index c5c01e5856f..18e92a8463c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -122,6 +122,7 @@ public class UtilHelpers {
   public static final String EXECUTE = "execute";
   public static final String SCHEDULE = "schedule";
   public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
+  public static final String PURGE_PENDING_INSTANT = "purge_pending_instant";
 
   private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index f12430dc266..b46a790f328 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -25,42 +25,34 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
+import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.deleteFileFromDfs;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test cases for {@link HoodieClusteringJob}.
  */
 public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase {
+
   @Test
   public void testHoodieClusteringJobWithClean() throws Exception {
     String tableBasePath = basePath + "/asyncClustering";
     Properties props = getPropertiesForKeyGen(true);
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
-        .forTable("asyncClustering")
-        .withPath(tableBasePath)
-        .withSchema(TRIP_EXAMPLE_SCHEMA)
-        .withParallelism(2, 2)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
-        .withAutoCommit(false)
-        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
-          .withInlineClustering(false)
-          .withScheduleInlineClustering(false)
-          .withAsyncClustering(false).build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder()
-          .logFileMaxSize(1024).build())
-        .withCleanConfig(HoodieCleanConfig.newBuilder()
-          .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-          .withAutoClean(false).withAsyncClean(false).build())
-        .build();
+    HoodieWriteConfig config = getWriteConfig(tableBasePath);
     props.putAll(config.getProps());
     Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(HoodieTableType.COPY_ON_WRITE)
@@ -68,7 +60,7 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
         .fromProperties(props)
         .build();
 
-    metaClient =  
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), 
tableBasePath, metaClientProps);
+    metaClient = 
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), 
tableBasePath, metaClientProps);
     client = new SparkRDDWriteClient(context, config);
 
     writeData(false, client.createNewInstantTime(), 100, true);
@@ -92,6 +84,58 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath, 
fs);
   }
 
+  @Test
+  public void testPurgePendingInstants() throws Exception {
+    String tableBasePath = basePath + "/purgePendingClustering";
+    Properties props = getPropertiesForKeyGen(true);
+    HoodieWriteConfig config = getWriteConfig(tableBasePath);
+    props.putAll(config.getProps());
+    Properties metaClientProps = HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .fromProperties(props)
+        .build();
+
+    metaClient = 
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), 
tableBasePath, metaClientProps);
+    client = new SparkRDDWriteClient(context, config);
+
+    writeData(false, client.createNewInstantTime(), 100, true);
+    writeData(false, client.createNewInstantTime(), 100, true);
+
+    // offline clustering execute without clean
+    HoodieClusteringJob hoodieCluster =
+        init(tableBasePath, true, "scheduleAndExecute", false);
+    hoodieCluster.cluster(0);
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, 
tableBasePath, fs);
+    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath, 
fs);
+
+    // remove the completed instant from timeline and trigger purge of pending 
clustering instant.
+    HoodieInstant latestClusteringInstant = metaClient.getActiveTimeline()
+        
.filterCompletedInstantsOrRewriteTimeline().getCompletedReplaceTimeline().getInstants().get(0);
+    String completedFilePath = tableBasePath + "/" + METAFOLDER_NAME + "/" + 
latestClusteringInstant.getFileName();
+    deleteFileFromDfs(fs, completedFilePath);
+
+    // trigger purge.
+    hoodieCluster =
+        getClusteringConfigForPurge(tableBasePath, true, 
PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp());
+    hoodieCluster.cluster(0);
+    // validate that there are no clustering commits in timeline.
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, 
tableBasePath, fs);
+
+    // validate that no records match the clustering instant.
+    String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", tableBasePath, 
dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals(0, HoodieClientTestUtils.read(jsc, tableBasePath, sqlContext, 
fs, fullPartitionPaths).filter("_hoodie_commit_time = " + 
latestClusteringInstant.getTimestamp()).count(),
+        "Must not contain any records w/ clustering instant time");
+  }
+
+  private void deleteCommitMetaFile(String instantTime, String suffix) throws 
IOException {
+    String targetPath = basePath + "/" + METAFOLDER_NAME + "/" + instantTime + 
suffix;
+    deleteFileFromDfs(fs, targetPath);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
@@ -102,6 +146,14 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     return new HoodieClusteringJob(jsc, clusterConfig);
   }
 
+  private HoodieClusteringJob getClusteringConfigForPurge(String 
tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean 
isAutoClean,
+                                                          String 
pendingInstant) {
+    HoodieClusteringJob.Config clusterConfig = 
buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, 
isAutoClean);
+    clusterConfig.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
+    clusterConfig.clusteringInstantTime = pendingInstant;
+    return new HoodieClusteringJob(jsc, clusterConfig);
+  }
+
   private HoodieClusteringJob.Config  buildHoodieClusteringUtilConfig(String 
basePath, boolean runSchedule, String runningMode, boolean isAutoClean) {
     HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
     config.basePath = basePath;
@@ -113,4 +165,25 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     config.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1));
     return config;
   }
+
+  private HoodieWriteConfig getWriteConfig(String tableBasePath) {
+    return HoodieWriteConfig.newBuilder()
+        .forTable("asyncClustering")
+        .withPath(tableBasePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withAutoCommit(false)
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(false)
+            .withScheduleInlineClustering(false)
+            .withAsyncClustering(false).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .logFileMaxSize(1024).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .withAutoClean(false).withAsyncClean(false).build())
+        .build();
+  }
+
 }

Reply via email to