This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 105f947c4de [HUDI-6382] Support hoodie-table-type changing in hudi-cli
(#9937)
105f947c4de is described below
commit 105f947c4debe4372b2d1d249e0642d86fafc4d9
Author: kongwei <[email protected]>
AuthorDate: Sat Nov 4 13:38:30 2023 +0800
[HUDI-6382] Support hoodie-table-type changing in hudi-cli (#9937)
---
.../hudi/cli/commands/CompactionCommand.java | 10 +-
.../org/apache/hudi/cli/commands/TableCommand.java | 129 ++++++++++++
.../apache/hudi/cli/integ/ITTestTableCommand.java | 216 +++++++++++++++++++++
.../common/testutils/HoodieTestDataGenerator.java | 14 +-
4 files changed, 365 insertions(+), 4 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index bb335d7886e..fed07dfae3a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -81,6 +81,10 @@ public class CompactionCommand {
private static final String TMP_DIR = "/tmp/";
+ public static final String COMPACTION_SCH_SUCCESSFUL = "Attempted to
schedule compaction for ";
+ public static final String COMPACTION_EXE_SUCCESSFUL = "Compaction
successfully completed for ";
+ public static final String COMPACTION_SCH_EXE_SUCCESSFUL = "Schedule and
execute compaction successfully completed";
+
private HoodieTableMetaClient checkAndGetMetaClient() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
if (client.getTableType() != HoodieTableType.MERGE_ON_READ) {
@@ -212,7 +216,7 @@ public class CompactionCommand {
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
- return "Attempted to schedule compaction for " + compactionInstantTime;
+ return COMPACTION_SCH_SUCCESSFUL + compactionInstantTime;
}
@ShellMethod(key = "compaction run", value = "Run Compaction for given
instant time")
@@ -261,7 +265,7 @@ public class CompactionCommand {
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
- return "Compaction successfully completed for " + compactionInstantTime;
+ return COMPACTION_EXE_SUCCESSFUL + compactionInstantTime;
}
@ShellMethod(key = "compaction scheduleAndExecute", value = "Schedule
compaction plan and execute this plan")
@@ -296,7 +300,7 @@ public class CompactionCommand {
if (exitCode != 0) {
return "Failed to schedule and execute compaction ";
}
- return "Schedule and execute compaction successfully completed";
+ return COMPACTION_SCH_EXE_SUCCESSFUL;
}
/**
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index f0b653ec1e9..158c79f52a7 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -23,10 +23,18 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
+import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
@@ -229,6 +237,127 @@ public class TableCommand {
return renderOldNewProps(newProps, oldProps);
}
+ @ShellMethod(key = "table change-table-type", value = "Change hudi table
type to target type: COW or MOR. "
+ + "Note: before changing to COW, by default this command will execute
all the pending compactions and execute a full compaction if needed.")
+ public String changeTableType(
+ @ShellOption(value = {"--target-type"},
+ help = "the target hoodie table type: COW or MOR") final String
changeType,
+ @ShellOption(value = {"--enable-compaction"}, defaultValue = "true",
+ help = "Valid in MOR to COW case. Before changing to COW, need to
perform a full compaction to compact all log files. Default true") final
boolean enableCompaction,
+ @ShellOption(value = {"--parallelism"}, defaultValue = "3",
+ help = "Valid in MOR to COW case. Parallelism for hoodie
compaction") final String parallelism,
+ @ShellOption(value = "--sparkMaster", defaultValue = "local",
+ help = "Valid in MOR to COW case. Spark Master") String master,
+ @ShellOption(value = "--sparkMemory", defaultValue = "4G",
+ help = "Valid in MOR to COW case. Spark executor memory") final
String sparkMemory,
+ @ShellOption(value = "--retry", defaultValue = "1",
+ help = "Valid in MOR to COW case. Number of retries") final String
retry,
+ @ShellOption(value = "--propsFilePath", defaultValue = "",
+ help = "Valid in MOR to COW case. path to properties file on localfs
or dfs with configurations for hoodie client for compacting") final String
propsFilePath,
+ @ShellOption(value = "--hoodieConfigs", defaultValue = "",
+ help = "Valid in MOR to COW case. Any configuration that can be set
in the properties file can be passed here in the form of an array") final
String[] configs)
+ throws Exception {
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ String tableName = client.getTableConfig().getTableName();
+ String tablePath = client.getBasePathV2().toString();
+ Map<String, String> oldProps = client.getTableConfig().propsMap();
+
+ HoodieTableType currentType = client.getTableType();
+ String targetType;
+ switch (changeType) {
+ case "MOR":
+ if (currentType.equals(HoodieTableType.MERGE_ON_READ)) {
+ return String.format("table %s path %s is already %s", tableName,
tablePath, HoodieTableType.MERGE_ON_READ);
+ }
+ targetType = HoodieTableType.MERGE_ON_READ.name();
+ break;
+ case "COW":
+ if (currentType.equals(HoodieTableType.COPY_ON_WRITE)) {
+ return String.format("table %s path %s is already %s", tableName,
tablePath, HoodieTableType.COPY_ON_WRITE);
+ }
+ targetType = HoodieTableType.COPY_ON_WRITE.name();
+ if (!enableCompaction) { // not enable compaction, have to check
+ HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
+ int compactionCount = activeTimeline.filter(instant ->
instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).countInstants();
+ if (compactionCount > 0) {
+ String errMsg = String.format("Remain %s compactions to compact,
please set --enable-compaction=true", compactionCount);
+ LOG.error(errMsg);
+ throw new HoodieException(errMsg);
+ }
+ Option<HoodieInstant> lastInstant =
client.getActiveTimeline().lastInstant();
+ if (lastInstant.isPresent()
+ &&
(!lastInstant.get().getAction().equals(HoodieTimeline.COMMIT_ACTION) ||
!lastInstant.get().isCompleted())) {
+ String errMsg = String.format("The last action must be a completed
compaction(commit[COMPLETED]) for this operation. "
+ + "But is %s[status=%s]", lastInstant.get().getAction(),
lastInstant.get().getState());
+ LOG.error(errMsg);
+ throw new HoodieException(errMsg);
+ }
+ break;
+ }
+
+ // compact all pending compactions.
+ List<HoodieInstant> pendingCompactionInstants =
client.getActiveTimeline().filterPendingCompactionTimeline().getInstants();
+ LOG.info("Remain {} compaction instants to compact",
pendingCompactionInstants.size());
+ for (int i = 0; i < pendingCompactionInstants.size(); i++) {
+ HoodieInstant compactionInstant = pendingCompactionInstants.get(i);
+ LOG.info("compact {} instant {}", i + 1, compactionInstant);
+ String result = new CompactionCommand().compact(parallelism, "",
master, sparkMemory, retry, compactionInstant.getTimestamp(), propsFilePath,
configs);
+ LOG.info("compact instant {} result: {}", compactionInstant, result);
+ if (!result.startsWith(CompactionCommand.COMPACTION_EXE_SUCCESSFUL))
{
+ throw new HoodieException(String.format("Compact %s failed",
compactionInstant));
+ }
+ }
+ // refresh the timeline
+ client.reloadActiveTimeline();
+ Option<HoodieInstant> lastInstant =
client.getActiveTimeline().lastInstant();
+ if (lastInstant.isPresent()) {
+ // before changing mor to cow, need to do a full compaction to merge
all logfiles;
+ if
(!lastInstant.get().getAction().equals(HoodieTimeline.COMMIT_ACTION) ||
!lastInstant.get().isCompleted()) {
+ LOG.info("There are remaining logfiles, will perform a full
compaction");
+ boolean compactionStrategyExist = false;
+ boolean compactionNumDeltaExist = false;
+ List<String> newConfigs = new ArrayList<>();
+ for (int i = 0; i < configs.length; i++) {
+ if
(configs[i].startsWith(HoodieCompactionConfig.COMPACTION_STRATEGY.key())) {
+ compactionStrategyExist = true;
+ configs[i] = String.format("%s=%s",
HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
+ }
+ if
(configs[i].startsWith(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()))
{
+ compactionNumDeltaExist = true;
+ configs[i] = String.format("%s=%s",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1");
+ }
+ newConfigs.add(configs[i]);
+ }
+ if (!compactionStrategyExist) {
+ newConfigs.add(String.format("%s=%s",
HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName()));
+ }
+ if (!compactionNumDeltaExist) {
+ newConfigs.add(String.format("%s=%s",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"));
+ }
+ String result = new CompactionCommand().compact(parallelism, "",
master, sparkMemory, retry, propsFilePath, newConfigs.toArray(new String[0]));
+ LOG.info("Full compaction result: {}", result);
+ if
(!result.equals(CompactionCommand.COMPACTION_SCH_EXE_SUCCESSFUL)) {
+ throw new HoodieException("Change table type to COW: schedule
and execute the full compaction failed");
+ }
+ }
+ }
+ break;
+ default:
+ throw new HoodieException("Unsupported change type " + changeType + ".
Only support MOR or COW.");
+ }
+
+ Properties updatedProps = new Properties();
+ updatedProps.putAll(oldProps);
+ // change the table type to target type
+ updatedProps.put(HoodieTableConfig.TYPE.key(), targetType);
+ Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
+ HoodieTableConfig.update(client.getFs(), metaPathDir, updatedProps);
+
+ HoodieCLI.refreshTableMetadata();
+ Map<String, String> newProps =
HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
+ return renderOldNewProps(newProps, oldProps);
+ }
+
private static String renderOldNewProps(Map<String, String> newProps,
Map<String, String> oldProps) {
TreeSet<String> allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java
new file mode 100644
index 00000000000..f10ba576475
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestTableCommand.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.HoodieCLIIntegrationTestBase;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import java.io.IOException;
+import java.util.List;
+
+import static
org.apache.hudi.cli.commands.CompactionCommand.COMPACTION_SCH_SUCCESSFUL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link TableCommand#changeTableType}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during
mvn package.
+ * Use integration test instead of unit test.
+ */
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false",
"spring.shell.command.script.enabled=false"})
+public class ITTestTableCommand extends HoodieCLIIntegrationTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(ITTestTableCommand.class);
+
+ @Autowired
+ private Shell shell;
+ private String tablePath;
+ private String tableName = "test_table";
+
+ @Test
+ public void testChangeTableCOW2MOR() throws IOException {
+ tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ HoodieTestDataGenerator.createCommitFile(tablePath, "100",
jsc.hadoopConfiguration());
+
+ Object result = shell.evaluate(() -> "table change-table-type
--target-type MOR");
+
+ assertEquals(String.class, result.getClass());
+ assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│
COPY_ON_WRITE +│ MERGE_ON_READ +║.*"));
+ assertEquals(HoodieTableType.MERGE_ON_READ,
HoodieCLI.getTableMetaClient().getTableType());
+ }
+
+ @Test
+ public void testChangeTableMOR2COW() throws IOException {
+ tablePath = basePath + Path.SEPARATOR + tableName + "_mor2cow";
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ Object result = shell.evaluate(() -> "table change-table-type
--target-type COW");
+
+ assertEquals(String.class, result.getClass());
+ assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+ assertEquals(HoodieTableType.COPY_ON_WRITE,
HoodieCLI.getTableMetaClient().getTableType());
+ }
+
+ @Test
+ public void testChangeTableMOR2COW_withPendingCompactions() throws Exception
{
+ tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ generateCommits();
+ // schedule a compaction
+ Object scheduleResult = shell.evaluate(() -> "compaction schedule
--hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster local");
+ assertEquals(String.class, scheduleResult.getClass());
+ assertTrue(((String)
scheduleResult).startsWith(COMPACTION_SCH_SUCCESSFUL));
+ Option<HoodieInstant> lastInstant =
HoodieCLI.getTableMetaClient().getActiveTimeline().lastInstant();
+ assertTrue(lastInstant.isPresent());
+ assertEquals(HoodieTimeline.COMPACTION_ACTION,
lastInstant.get().getAction());
+ assertTrue(lastInstant.get().isRequested());
+
+ generateCommits();
+ Object result = shell.evaluate(() -> "table change-table-type
--target-type COW");
+
+ LOG.info("change to cow result \n{}", result);
+ assertEquals(String.class, result.getClass());
+ assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+ // table change to cow type successfully
+ assertEquals(HoodieTableType.COPY_ON_WRITE,
HoodieCLI.getTableMetaClient().getTableType());
+ lastInstant =
HoodieCLI.getTableMetaClient().getActiveTimeline().lastInstant();
+ assertTrue(lastInstant.isPresent());
+ assertEquals(HoodieTimeline.COMMIT_ACTION, lastInstant.get().getAction());
+ assertTrue(lastInstant.get().isCompleted());
+ }
+
+ @Test
+ public void testChangeTableMOR2COW_withFullCompaction() throws Exception {
+ tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ generateCommits();
+ Object result = shell.evaluate(() -> "table change-table-type
--target-type COW");
+
+ LOG.info("change to cow result \n{}", result);
+ assertEquals(String.class, result.getClass());
+ assertTrue(((String) result).matches("(?s).*║ hoodie.table.type +│
MERGE_ON_READ +│ COPY_ON_WRITE +║.*"));
+ // table change to cow type successfully
+ assertEquals(HoodieTableType.COPY_ON_WRITE,
HoodieCLI.getTableMetaClient().getTableType());
+ HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ // no compaction left
+ assertTrue(activeTimeline.filter(instant ->
instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).empty());
+ Option<HoodieInstant> lastInstant = activeTimeline.lastInstant();
+ assertTrue(lastInstant.isPresent());
+ assertEquals(HoodieTimeline.COMMIT_ACTION, lastInstant.get().getAction());
+ assertTrue(lastInstant.get().isCompleted());
+ }
+
+ @Test
+ public void testChangeTableMOR2COW_withoutCompaction() throws Exception {
+ tablePath = basePath + Path.SEPARATOR + tableName + "_cow2mor";
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ generateCommits();
+ Object result = shell.evaluate(() -> "table change-table-type
--target-type COW --enable-compaction false");
+
+ LOG.info("change to cow result \n{}", result);
+ assertEquals(HoodieException.class, result.getClass());
+ assertTrue(((HoodieException) result).getMessage().startsWith("The last
action must be a completed compaction"));
+ // table change to cow type failed
+ assertEquals(HoodieTableType.MERGE_ON_READ,
HoodieCLI.getTableMetaClient().getTableType());
+ }
+
+ private void generateCommits() throws IOException {
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // Create the write client to write some records in
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withDeleteParallelism(2)
+ .forTable(tableName)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+ try (SparkRDDWriteClient<HoodieAvroPayload> client = new
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) {
+ String instantTime = client.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 2);
+ upsert(jsc, client, records, instantTime);
+
+ instantTime = client.createNewInstantTime();
+ List<HoodieRecord> recordsToUpdate =
dataGen.generateUpdates(instantTime, 2);
+ records.addAll(recordsToUpdate);
+ upsert(jsc, client, records, instantTime);
+ }
+ }
+
+ private void upsert(JavaSparkContext jsc,
SparkRDDWriteClient<HoodieAvroPayload> client,
+ List<HoodieRecord> records, String newCommitTime) throws
IOException {
+ client.startCommitWithTime(newCommitTime);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ operateFunc(SparkRDDWriteClient::upsert, client, writeRecords,
newCommitTime);
+ }
+
+ private void operateFunc(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn,
+ SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord>
writeRecords, String commitTime)
+ throws IOException {
+ writeFn.apply(client, writeRecords, commitTime);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index fbc8c0a29d6..8dc53b65edb 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -519,7 +519,19 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
private static void createCommitFile(String basePath, String instantTime,
Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime + "_" +
InProcessTimeGenerator.createNewInstantTime()),
HoodieTimeline.makeInflightCommitFileName(instantTime),
- HoodieTimeline.makeRequestedCommitFileName(instantTime))
+ HoodieTimeline.makeRequestedCommitFileName(instantTime))
+ .forEach(f -> createMetadataFile(f, basePath, configuration,
commitMetadata));
+ }
+
+ public static void createDeltaCommitFile(String basePath, String
instantTime, Configuration configuration) {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ createDeltaCommitFile(basePath, instantTime, configuration,
commitMetadata);
+ }
+
+ private static void createDeltaCommitFile(String basePath, String
instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
+ Arrays.asList(HoodieTimeline.makeDeltaFileName(instantTime + "_" +
InProcessTimeGenerator.createNewInstantTime()),
+ HoodieTimeline.makeInflightDeltaFileName(instantTime),
+ HoodieTimeline.makeRequestedDeltaFileName(instantTime))
.forEach(f -> createMetadataFile(f, basePath, configuration,
commitMetadata));
}