This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 105f947c4de [HUDI-6382] Support hoodie-table-type changing in hudi-cli 
(#9937)
105f947c4de is described below

commit 105f947c4debe4372b2d1d249e0642d86fafc4d9
Author: kongwei <[email protected]>
AuthorDate: Sat Nov 4 13:38:30 2023 +0800

    [HUDI-6382] Support hoodie-table-type changing in hudi-cli (#9937)
---
 .../hudi/cli/commands/CompactionCommand.java       |  10 +-
 .../org/apache/hudi/cli/commands/TableCommand.java | 129 ++++++++++++
 .../apache/hudi/cli/integ/ITTestTableCommand.java  | 216 +++++++++++++++++++++
 .../common/testutils/HoodieTestDataGenerator.java  |  14 +-
 4 files changed, 365 insertions(+), 4 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index bb335d7886e..fed07dfae3a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -81,6 +81,10 @@ public class CompactionCommand {
 
   private static final String TMP_DIR = "/tmp/";
 
+  public static final String COMPACTION_SCH_SUCCESSFUL = "Attempted to 
schedule compaction for ";
+  public static final String COMPACTION_EXE_SUCCESSFUL = "Compaction 
successfully completed for ";
+  public static final String COMPACTION_SCH_EXE_SUCCESSFUL = "Schedule and 
execute compaction successfully completed";
+
   private HoodieTableMetaClient checkAndGetMetaClient() {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     if (client.getTableType() != HoodieTableType.MERGE_ON_READ) {
@@ -212,7 +216,7 @@ public class CompactionCommand {
     if (exitCode != 0) {
       return "Failed to run compaction for " + compactionInstantTime;
     }
-    return "Attempted to schedule compaction for " + compactionInstantTime;
+    return COMPACTION_SCH_SUCCESSFUL + compactionInstantTime;
   }
 
   @ShellMethod(key = "compaction run", value = "Run Compaction for given 
instant time")
@@ -261,7 +265,7 @@ public class CompactionCommand {
     if (exitCode != 0) {
       return "Failed to run compaction for " + compactionInstantTime;
     }
-    return "Compaction successfully completed for " + compactionInstantTime;
+    return COMPACTION_EXE_SUCCESSFUL + compactionInstantTime;
   }
 
   @ShellMethod(key = "compaction scheduleAndExecute", value = "Schedule 
compaction plan and execute this plan")
@@ -296,7 +300,7 @@ public class CompactionCommand {
     if (exitCode != 0) {
       return "Failed to schedule and execute compaction ";
     }
-    return "Schedule and execute compaction successfully completed";
+    return COMPACTION_SCH_EXE_SUCCESSFUL;
   }
 
   /**
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index f0b653ec1e9..158c79f52a7 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -23,10 +23,18 @@ import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
+import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
@@ -229,6 +237,127 @@ public class TableCommand {
     return renderOldNewProps(newProps, oldProps);
   }
 
+  @ShellMethod(key = "table change-table-type", value = "Change hudi table 
type to target type: COW or MOR. "
+      + "Note: before changing to COW, by default this command will execute 
all the pending compactions and execute a full compaction if needed.")
+  public String changeTableType(
+      @ShellOption(value = {"--target-type"},
+          help = "the target hoodie table type: COW or MOR") final String 
changeType,
+      @ShellOption(value = {"--enable-compaction"}, defaultValue = "true",
+          help = "Valid in MOR to COW case. Before changing to COW, need to 
perform a full compaction to compact all log files. Default true") final 
boolean enableCompaction,
+      @ShellOption(value = {"--parallelism"}, defaultValue = "3",
+          help = "Valid in MOR to COW case. Parallelism for hoodie 
compaction") final String parallelism,
+      @ShellOption(value = "--sparkMaster", defaultValue = "local",
+          help = "Valid in MOR to COW case. Spark Master") String master,
+      @ShellOption(value = "--sparkMemory", defaultValue = "4G",
+          help = "Valid in MOR to COW case. Spark executor memory") final 
String sparkMemory,
+      @ShellOption(value = "--retry", defaultValue = "1",
+          help = "Valid in MOR to COW case. Number of retries") final String 
retry,
+      @ShellOption(value = "--propsFilePath", defaultValue = "",
+          help = "Valid in MOR to COW case. path to properties file on localfs 
or dfs with configurations for hoodie client for compacting") final String 
propsFilePath,
+      @ShellOption(value = "--hoodieConfigs", defaultValue = "",
+          help = "Valid in MOR to COW case. Any configuration that can be set 
in the properties file can be passed here in the form of an array") final 
String[] configs)
+      throws Exception {
+    HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+    String tableName = client.getTableConfig().getTableName();
+    String tablePath = client.getBasePathV2().toString();
+    Map<String, String> oldProps = client.getTableConfig().propsMap();
+
+    HoodieTableType currentType = client.getTableType();
+    String targetType;
+    switch (changeType) {
+      case "MOR":
+        if (currentType.equals(HoodieTableType.MERGE_ON_READ)) {
+          return String.format("table %s path %s is already %s", tableName, 
tablePath, HoodieTableType.MERGE_ON_READ);
+        }
+        targetType = HoodieTableType.MERGE_ON_READ.name();
+        break;
+      case "COW":
+        if (currentType.equals(HoodieTableType.COPY_ON_WRITE)) {
+          return String.format("table %s path %s is already %s", tableName, 
tablePath, HoodieTableType.COPY_ON_WRITE);
+        }
+        targetType = HoodieTableType.COPY_ON_WRITE.name();
+        if (!enableCompaction) {  // not enable compaction, have to check
+          HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
+          int compactionCount = activeTimeline.filter(instant -> 
instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).countInstants();
+          if (compactionCount > 0) {
+            String errMsg = String.format("Remain %s compactions to compact, 
please set --enable-compaction=true", compactionCount);
+            LOG.error(errMsg);
+            throw new HoodieException(errMsg);
+          }
+          Option<HoodieInstant> lastInstant = 
client.getActiveTimeline().lastInstant();
+          if (lastInstant.isPresent()
+              && 
(!lastInstant.get().getAction().equals(HoodieTimeline.COMMIT_ACTION) || 
!lastInstant.get().isCompleted())) {
+            String errMsg = String.format("The last action must be a completed 
compaction(commit[COMPLETED]) for this operation. "
+                + "But is %s[status=%s]", lastInstant.get().getAction(), 
lastInstant.get().getState());
+            LOG.error(errMsg);
+            throw new HoodieException(errMsg);
+          }
+          break;
+        }
+
+        // compact all pending compactions.
+        List<HoodieInstant> pendingCompactionInstants = 
client.getActiveTimeline().filterPendingCompactionTimeline().getInstants();
+        LOG.info("Remain {} compaction instants to compact", 
pendingCompactionInstants.size());
+        for (int i = 0; i < pendingCompactionInstants.size(); i++) {
+          HoodieInstant compactionInstant = pendingCompactionInstants.get(i);
+          LOG.info("compact {} instant {}", i + 1, compactionInstant);
+          String result = new CompactionCommand().compact(parallelism, "", 
master, sparkMemory, retry, compactionInstant.getTimestamp(), propsFilePath, 
configs);
+          LOG.info("compact instant {} result: {}", compactionInstant, result);
+          if (!result.startsWith(CompactionCommand.COMPACTION_EXE_SUCCESSFUL)) 
{
+            throw new HoodieException(String.format("Compact %s failed", 
compactionInstant));
+          }
+        }
+        // refresh the timeline
+        client.reloadActiveTimeline();
+        Option<HoodieInstant> lastInstant = 
client.getActiveTimeline().lastInstant();
+        if (lastInstant.isPresent()) {
+          // before changing mor to cow, need to do a full compaction to merge 
all logfiles;
+          if 
(!lastInstant.get().getAction().equals(HoodieTimeline.COMMIT_ACTION) || 
!lastInstant.get().isCompleted()) {
+            LOG.info("There are remaining logfiles, will perform a full 
compaction");
+            boolean compactionStrategyExist = false;
+            boolean compactionNumDeltaExist = false;
+            List<String> newConfigs = new ArrayList<>();
+            for (int i = 0; i < configs.length; i++) {
+              if 
(configs[i].startsWith(HoodieCompactionConfig.COMPACTION_STRATEGY.key())) {
+                compactionStrategyExist = true;
+                configs[i] = String.format("%s=%s", 
HoodieCompactionConfig.COMPACTION_STRATEGY.key(), 
UnBoundedCompactionStrategy.class.getName());
+              }
+              if 
(configs[i].startsWith(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()))
 {
+                compactionNumDeltaExist = true;
+                configs[i] = String.format("%s=%s", 
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1");
+              }
+              newConfigs.add(configs[i]);
+            }
+            if (!compactionStrategyExist) {
+              newConfigs.add(String.format("%s=%s", 
HoodieCompactionConfig.COMPACTION_STRATEGY.key(), 
UnBoundedCompactionStrategy.class.getName()));
+            }
+            if (!compactionNumDeltaExist) {
+              newConfigs.add(String.format("%s=%s", 
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"));
+            }
+            String result = new CompactionCommand().compact(parallelism, "", 
master, sparkMemory, retry, propsFilePath, newConfigs.toArray(new String[0]));
+            LOG.info("Full compaction result: {}", result);
+            if 
(!result.equals(CompactionCommand.COMPACTION_SCH_EXE_SUCCESSFUL)) {
+              throw new HoodieException("Change table type to COW: schedule 
and execute the full compaction failed");
+            }
+          }
+        }
+        break;
+      default:
+        throw new HoodieException("Unsupported change type " + changeType + ". 
Only support MOR or COW.");
+    }
+
+    Properties updatedProps = new Properties();
+    updatedProps.putAll(oldProps);
+    // change the table type to target type
+    updatedProps.put(HoodieTableConfig.TYPE.key(), targetType);
+    Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
+    HoodieTableConfig.update(client.getFs(), metaPathDir, updatedProps);
+
+    HoodieCLI.refreshTableMetadata();
+    Map<String, String> newProps = 
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
+    return renderOldNewProps(newProps, oldProps);
+  }
+
   private static String renderOldNewProps(Map<String, String> newProps, 
Map<String, String> oldProps) {
     TreeSet<String> allPropKeys = new TreeSet<>();
     
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java
new file mode 100644
index 00000000000..f10ba576475
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hudi.cli.commands.CompactionCommand.COMPACTION_SCH_SUCCESSFUL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link TableCommand#changeTableType}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during 
mvn package.
+ * Use integration test instead of unit test.
+ */
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", 
"spring.shell.command.script.enabled=false"})
+public class ITTestTableCommand extends HoodieCLIIntegrationTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ITTestTableCommand.class);
+
+  @Autowired
+  private Shell shell;
+  private String tablePath;
+  private String tableName = "test_table";
+
+  @Test
+  public void testChangeTableCOW2MOR() throws IOException {
+    tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    HoodieTestDataGenerator.createCommitFile(tablePath, "100", 
jsc.hadoopConfiguration());
+
+    Object result = shell.evaluate(() -> "table change-table-type 
--target-type MOR");
+
+    assertEquals(String.class, result.getClass());
+    assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│ 
COPY_ON_WRITE +│ MERGE_ON_READ +║.*"));
+    assertEquals(HoodieTableType.MERGE_ON_READ, 
HoodieCLI.getTableMetaClient().getTableType());
+  }
+
+  @Test
+  public void testChangeTableMOR2COW() throws IOException {
+    tablePath = basePath + Path.SEPARATOR + tableName + "_mor2cow";
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    Object result = shell.evaluate(() -> "table change-table-type 
--target-type COW");
+
+    assertEquals(String.class, result.getClass());
+    assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│ 
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+    assertEquals(HoodieTableType.COPY_ON_WRITE, 
HoodieCLI.getTableMetaClient().getTableType());
+  }
+
+  @Test
+  public void testChangeTableMOR2COW_withPendingCompactions() throws Exception 
{
+    tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    generateCommits();
+    // schedule a compaction
+    Object scheduleResult = shell.evaluate(() -> "compaction schedule 
--hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster local");
+    assertEquals(String.class, scheduleResult.getClass());
+    assertTrue(((String) 
scheduleResult).startsWith(COMPACTION_SCH_SUCCESSFUL));
+    Option<HoodieInstant> lastInstant = 
HoodieCLI.getTableMetaClient().getActiveTimeline().lastInstant();
+    assertTrue(lastInstant.isPresent());
+    assertEquals(HoodieTimeline.COMPACTION_ACTION, 
lastInstant.get().getAction());
+    assertTrue(lastInstant.get().isRequested());
+
+    generateCommits();
+    Object result = shell.evaluate(() -> "table change-table-type 
--target-type COW");
+
+    LOG.info("change to cow result \n{}", result);
+    assertEquals(String.class, result.getClass());
+    assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│ 
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+    // table change to cow type successfully
+    assertEquals(HoodieTableType.COPY_ON_WRITE, 
HoodieCLI.getTableMetaClient().getTableType());
+    lastInstant = 
HoodieCLI.getTableMetaClient().getActiveTimeline().lastInstant();
+    assertTrue(lastInstant.isPresent());
+    assertEquals(HoodieTimeline.COMMIT_ACTION, lastInstant.get().getAction());
+    assertTrue(lastInstant.get().isCompleted());
+  }
+
+  @Test
+  public void testChangeTableMOR2COW_withFullCompaction() throws Exception {
+    tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    generateCommits();
+    Object result = shell.evaluate(() -> "table change-table-type 
--target-type COW");
+
+    LOG.info("change to cow result \n{}", result);
+    assertEquals(String.class, result.getClass());
+    assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│ 
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+    // table change to cow type successfully
+    assertEquals(HoodieTableType.COPY_ON_WRITE, 
HoodieCLI.getTableMetaClient().getTableType());
+    HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
+    // no compaction left
+    assertTrue(activeTimeline.filter(instant -> 
instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).empty());
+    Option<HoodieInstant> lastInstant = activeTimeline.lastInstant();
+    assertTrue(lastInstant.isPresent());
+    assertEquals(HoodieTimeline.COMMIT_ACTION, lastInstant.get().getAction());
+    assertTrue(lastInstant.get().isCompleted());
+  }
+
+  @Test
+  public void testChangeTableMOR2COW_withoutCompaction() throws Exception {
+    tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    generateCommits();
+    Object result = shell.evaluate(() -> "table change-table-type 
--target-type COW --enable-compaction false");
+
+    LOG.info("change to cow result \n{}", result);
+    assertEquals(HoodieException.class, result.getClass());
+    assertTrue(((HoodieException) result).getMessage().startsWith("The last 
action must be a completed compaction"));
+    // table change to cow type failed
+    assertEquals(HoodieTableType.MERGE_ON_READ, 
HoodieCLI.getTableMetaClient().getTableType());
+  }
+
+  private void generateCommits() throws IOException {
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    // Create the write client to write some records in
+    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withDeleteParallelism(2)
+        .forTable(tableName)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+    try (SparkRDDWriteClient<HoodieAvroPayload> client = new 
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) {
+      String instantTime = client.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 2);
+      upsert(jsc, client, records, instantTime);
+
+      instantTime = client.createNewInstantTime();
+      List<HoodieRecord> recordsToUpdate = 
dataGen.generateUpdates(instantTime, 2);
+      records.addAll(recordsToUpdate);
+      upsert(jsc, client, records, instantTime);
+    }
+  }
+
+  private void upsert(JavaSparkContext jsc, 
SparkRDDWriteClient<HoodieAvroPayload> client,
+                      List<HoodieRecord> records, String newCommitTime) throws 
IOException {
+    client.startCommitWithTime(newCommitTime);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, 
newCommitTime);
+  }
+
+  private void operateFunc(
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn,
+      SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> 
writeRecords, String commitTime)
+      throws IOException {
+    writeFn.apply(client, writeRecords, commitTime);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index fbc8c0a29d6..8dc53b65edb 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -519,7 +519,19 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
 
   private static void createCommitFile(String basePath, String instantTime, 
Configuration configuration, HoodieCommitMetadata commitMetadata) {
     Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime + "_" + 
InProcessTimeGenerator.createNewInstantTime()), 
HoodieTimeline.makeInflightCommitFileName(instantTime),
-        HoodieTimeline.makeRequestedCommitFileName(instantTime))
+            HoodieTimeline.makeRequestedCommitFileName(instantTime))
+        .forEach(f -> createMetadataFile(f, basePath, configuration, 
commitMetadata));
+  }
+
+  public static void createDeltaCommitFile(String basePath, String 
instantTime, Configuration configuration) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    createDeltaCommitFile(basePath, instantTime, configuration, 
commitMetadata);
+  }
+
+  private static void createDeltaCommitFile(String basePath, String 
instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
+    Arrays.asList(HoodieTimeline.makeDeltaFileName(instantTime + "_" + 
InProcessTimeGenerator.createNewInstantTime()),
+            HoodieTimeline.makeInflightDeltaFileName(instantTime),
+            HoodieTimeline.makeRequestedDeltaFileName(instantTime))
         .forEach(f -> createMetadataFile(f, basePath, configuration, 
commitMetadata));
   }
 

Reply via email to