This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b873d8db31 [HUDI-4433] hudi-cli repair deduplicate not working with
non-partitioned dataset (#6349)
b873d8db31 is described below
commit b873d8db31892b964c06fb5a85a835768e5b38f4
Author: ChanKyeong Won <[email protected]>
AuthorDate: Sun Sep 25 15:35:46 2022 +0900
[HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned
dataset (#6349)
When using the repair deduplicate command with hudi-cli,
there is no way to run it on the unpartitioned dataset,
so modify the cli parameter.
Co-authored-by: Xingjun Wang <[email protected]>
---
.../apache/hudi/cli/commands/RepairsCommand.java | 2 +-
.../hudi/cli/integ/ITTestRepairsCommand.java | 52 ++++++++++++++++++++++
.../common/testutils/HoodieTestDataGenerator.java | 1 +
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index f0ff924e22..2b11e20a10 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -69,7 +69,7 @@ public class RepairsCommand {
@ShellMethod(key = "repair deduplicate",
value = "De-duplicate a partition path contains duplicates & produce
repaired files to replace with")
public String deduplicate(
- @ShellOption(value = {"--duplicatedPartitionPath"}, help = "Partition
Path containing the duplicates")
+ @ShellOption(value = {"--duplicatedPartitionPath"}, defaultValue = "",
help = "Partition Path containing the duplicates")
final String duplicatedPartitionPath,
@ShellOption(value = {"--repairedOutputPath"}, help = "Location to place
the repaired files")
final String repairedOutputPath,
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 5938a8ffe2..69db47136e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -69,6 +69,7 @@ public class ITTestRepairsCommand extends
HoodieCLIIntegrationTestBase {
private String duplicatedPartitionPath;
private String duplicatedPartitionPathWithUpdates;
private String duplicatedPartitionPathWithUpserts;
+ private String duplicatedNoPartitionPath;
private String repairedOutputPath;
private HoodieFileFormat fileFormat;
@@ -78,6 +79,7 @@ public class ITTestRepairsCommand extends
HoodieCLIIntegrationTestBase {
duplicatedPartitionPath =
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
duplicatedPartitionPathWithUpdates =
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
duplicatedPartitionPathWithUpserts =
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+ duplicatedNoPartitionPath = HoodieTestDataGenerator.NO_PARTITION_PATH;
repairedOutputPath = Paths.get(basePath, "tmp").toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
@@ -135,6 +137,23 @@ public class ITTestRepairsCommand extends
HoodieCLIIntegrationTestBase {
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH,
"7", dupRecords)
.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH,
"8", dupRecords);
+ // init cow table for non-partitioned table tests
+ String cowNonPartitionedTablePath = Paths.get(basePath,
"cow_table_non_partitioned").toString();
+
+ // Create cow table and connect
+ new TableCommand().createTable(
+ cowNonPartitionedTablePath, "cow_table_non_partitioned",
HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ HoodieSparkWriteableTestTable cowNonPartitionedTable =
HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
+
+ cowNonPartitionedTable.addCommit("20160401010101")
+ .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1",
hoodieRecords1)
+ .getFileIdWithLogFile(HoodieTestDataGenerator.NO_PARTITION_PATH);
+
+ cowNonPartitionedTable.addCommit("20160401010202")
+ .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "2",
dupRecords);
+
fileFormat = metaClient.getTableConfig().getBaseFileFormat();
}
@@ -232,6 +251,39 @@ public class ITTestRepairsCommand extends
HoodieCLIIntegrationTestBase {
assertEquals(100, result.count());
}
+ /**
+ * Test case dry run deduplicate for non-partitioned dataset.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testDeduplicateNoPartitionWithInserts(HoodieTableType tableType)
throws IOException {
+ String tablePath = Paths.get(basePath,
"cow_table_non_partitioned").toString();
+ connectTableAndReloadMetaClient(tablePath);
+ HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+ fs.listStatus(new Path(Paths.get(tablePath,
duplicatedNoPartitionPath).toString())));
+ List<String> filteredStatuses =
fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ assertEquals(2, filteredStatuses.size(), "There should be 2 files.");
+
+ // Before deduplicate, all files contain 110 records
+ String[] files = filteredStatuses.toArray(new String[0]);
+ Dataset df = readFiles(files);
+ assertEquals(110, df.count());
+
+ // use default value without specifying duplicatedPartitionPath
+ String cmdStr = String.format("repair deduplicate --repairedOutputPath %s
--sparkMaster %s",
+ repairedOutputPath, "local");
+ Object resultForCmd = shell.evaluate(() -> cmdStr);
+ assertTrue(ShellEvaluationResultUtil.isSuccess(resultForCmd));
+ assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX +
repairedOutputPath, resultForCmd.toString());
+
+ // After deduplicate, there are 100 records
+ FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath));
+ files = Arrays.stream(fileStatus).map(status ->
status.getPath().toString()).toArray(String[]::new);
+ Dataset result = readFiles(files);
+ assertEquals(100, result.count());
+ }
+
/**
* Test case for real run deduplicate.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 81a1f32ca2..8614060126 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -89,6 +89,7 @@ public class HoodieTestDataGenerator implements AutoCloseable
{
// with default bloom filter with 60,000 entries and 0.000000001 FPRate
public static final int BLOOM_FILTER_BYTES = 323495;
private static Logger logger =
LogManager.getLogger(HoodieTestDataGenerator.class);
+ public static final String NO_PARTITION_PATH = "";
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";