This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a2ae16 [HUDI-781] Introduce HoodieTestTable for test preparation
(#1997)
3a2ae16 is described below
commit 3a2ae16961131ed22e3b09877ff00cc42d47aa7d
Author: Raymond Xu <[email protected]>
AuthorDate: Thu Aug 20 20:46:33 2020 -0700
[HUDI-781] Introduce HoodieTestTable for test preparation (#1997)
---
.../cli/commands/TestUpgradeDowngradeCommand.java | 13 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 1 +
.../org/apache/hudi/io/HoodieCreateHandle.java | 1 +
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 +
.../org/apache/hudi/io/HoodieRowCreateHandle.java | 1 +
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 3 +-
.../java/org/apache/hudi/table/MarkerFiles.java | 15 +-
.../rollback/MarkerBasedRollbackStrategy.java | 8 +-
.../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 15 +-
.../java/org/apache/hudi/table/TestCleaner.java | 395 +++++++--------------
.../apache/hudi/table/TestConsistencyGuard.java | 28 +-
.../org/apache/hudi/table/TestMarkerFiles.java | 10 +-
.../table/action/commit/TestUpsertPartitioner.java | 8 +-
.../table/action/compact/TestHoodieCompactor.java | 7 +-
.../rollback/TestMarkerBasedRollbackStrategy.java | 69 ++--
.../hudi/testutils/HoodieClientTestUtils.java | 126 +------
.../java/org/apache/hudi/common/model}/IOType.java | 15 +-
.../hudi/common/testutils/FileCreateUtils.java | 147 ++++++++
.../hudi/common/testutils/HoodieTestTable.java | 232 ++++++++++++
.../hudi/common/testutils/HoodieTestUtils.java | 86 +++--
21 files changed, 662 insertions(+), 521 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index 4c0479a..2334f12 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -21,13 +21,14 @@ package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -77,11 +78,11 @@ public class TestUpgradeDowngradeCommand extends
AbstractShellIntegrationTest {
// generate commit and marker files for inflight commit 101
for (String commitTime : Arrays.asList(commitTime2)) {
HoodieTestUtils.createDataFile(tablePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1");
- HoodieClientTestUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1");
+ FileCreateUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1",
IOType.MERGE);
HoodieTestUtils.createDataFile(tablePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2");
- HoodieClientTestUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2");
+ FileCreateUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2",
IOType.MERGE);
HoodieTestUtils.createDataFile(tablePath,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3");
- HoodieClientTestUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3");
+ FileCreateUtils.createMarkerFile(tablePath,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3",
IOType.MERGE);
}
}
@@ -98,7 +99,7 @@ public class TestUpgradeDowngradeCommand extends
AbstractShellIntegrationTest {
// verify marker files for inflight commit exists
for (String partitionPath :
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) {
- assertEquals(1, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath,
partitionPath, "101"));
+ assertEquals(1, FileCreateUtils.getTotalMarkerFileCount(tablePath,
partitionPath, "101", IOType.MERGE));
}
SparkMain.upgradeOrDowngradeTable(jsc, tablePath,
HoodieTableVersion.ZERO.name());
@@ -112,7 +113,7 @@ public class TestUpgradeDowngradeCommand extends
AbstractShellIntegrationTest {
// verify marker files are non existant
for (String partitionPath :
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) {
- assertEquals(0, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath,
partitionPath, "101"));
+ assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath,
partitionPath, "101", IOType.MERGE));
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 7a8e5ab..7996a77 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 705e98d..5a76dc7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index f0ea284..8d54065 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
index 723d9f9..fa160c6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index d148b1b..5ea8c38 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -33,13 +34,13 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
index 8a310fd..9577cea 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -18,26 +18,27 @@
package org.apache.hudi.table;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.IOType;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 40b81a2..2a137b4 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,10 +18,10 @@
package org.apache.hudi.table.action.rollback;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -29,19 +29,21 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import scala.Tuple2;
+
/**
* Performs rollback using marker files generated during the write..
*/
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 4960ff5..e9c9e28 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 51d8a6a..24e538e 100644
---
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -47,7 +48,6 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.WriteHelper;
@@ -80,6 +80,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
@@ -506,8 +507,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// verify one basefile per partition
String[] fullExpectedPartitionPaths =
getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new));
- Map<String, Integer> baseFileCounts =
getBaseFileCounts(fullExpectedPartitionPaths);
- for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs,
fullExpectedPartitionPaths);
+ for (Map.Entry<String, Long> entry : baseFileCounts.entrySet()) {
assertEquals(1, entry.getValue());
}
assertTrue(baseFileCounts.entrySet().stream().allMatch(entry ->
entry.getValue() == 1));
@@ -532,9 +533,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// verify that there are more than 1 basefiles per partition
// we can't guarantee randomness in partitions where records are
distributed. So, verify atleast one partition has more than 1 basefile.
- baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ baseFileCounts = getBaseFileCountsForPaths(basePath, fs,
fullPartitionPaths);
assertTrue(baseFileCounts.entrySet().stream().filter(entry ->
entry.getValue() > 1).count() >= 1,
- "Atleast one partition should have more than 1 base file after 2nd
batch of writes");
+ "At least one partition should have more than 1 base file after 2nd
batch of writes");
// Write 3 (upserts to records from batch 1 with diff partition path)
newCommitTime = "003";
@@ -605,10 +606,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
return fullPartitionPaths;
}
- private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
- return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs,
fullPartitionPaths);
- }
-
private void
assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>>
expectedPartitionPathRecKeyPairs,
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
// verify all partitionpath, record key matches
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index a064788..6dacc81 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -50,8 +51,8 @@ import
org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.view.TableFileSystemView;
-import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -64,17 +65,21 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -501,136 +506,101 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
- * Test HoodieTable.clean() Cleaning by versions for COW table.
- */
- @Test
- public void testKeepLatestFileVersions() throws IOException {
- testKeepLatestFileVersions(false);
- }
-
- /**
- * Test HoodieTable.clean() Cleaning by version logic for COW table with
Bootstrap source file clean enable.
- */
- @Test
- public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws
IOException {
- testKeepLatestFileVersions(true);
- }
-
- /**
* Test HoodieTable.clean() Cleaning by versions logic.
*/
- public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean)
throws IOException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean)
throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+ Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
// make 1 commit, with 1 file per partition
- HoodieTestUtils.createCommitFiles(basePath, "00000000000001");
-
- Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
-
- String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
+ String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p0).get(0).getFileId()
: UUID.randomUUID().toString();
- String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
+ String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p1).get(0).getFileId()
: UUID.randomUUID().toString();
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0); // insert
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
file1P1C0); // insert
+ testTable.addCommit("00000000000001").withUpdates(p0,
file1P0C0).withUpdates(p1, file1P1C0);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
- file1P1C0));
+ assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 1 insert & 1 update per partition
- HoodieTestUtils.createCommitFiles(basePath, "00000000000002");
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- String file2P0C1 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); //
insert
- String file2P1C1 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); //
insert
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file1P0C0); // update
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
file1P1C0); // update
+ Map<String, String> partitionAndFileId002 =
testTable.addCommit("00000000000002")
+ .withUpdates(p0, file1P0C0)
+ .withUpdates(p1, file1P1C0)
+ .withInserts(p0, p1);
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
// enableBootstrapSourceClean would delete the bootstrap base file as the
same time
- HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
assertEquals(enableBootstrapSourceClean ? 2 : 1,
cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean
at least 1 file");
if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
-
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus();
+ bootstrapMapping.get(p0).get(0).getBoostrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " +
cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
- assertFalse(new File(bootstrapMapping.get(
-
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+ assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
+ p0).get(0).getBoostrapFileStatus().getPath().getUri())));
}
- cleanStat = getCleanStat(hoodieCleanStatsTwo,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
+ cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ String file2P1C1 = partitionAndFileId002.get(p1);
+ assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
+ assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
+ assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0));
assertEquals(enableBootstrapSourceClean ? 2 : 1,
cleanStat.getSuccessDeleteFiles().size()
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean
at least 1 file");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file2P0C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
- file2P1C1));
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus =
-
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus();
+ bootstrapMapping.get(p1).get(0).getBoostrapFileStatus();
// This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " +
cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri());
- assertFalse(new File(bootstrapMapping.get(
-
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+ assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
+ p1).get(0).getBoostrapFileStatus().getPath().getUri())));
}
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
- "00000000000001", file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert
- HoodieTestUtils.createCommitFiles(basePath, "00000000000003");
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file1P0C0); // update
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file2P0C1); // update
- String file3P0C2 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
-
+ String file3P0C2 = testTable.addCommit("00000000000003")
+ .withUpdates(p0, file1P0C0, file2P0C1)
+ .withInserts(p0, "00000000000003").get(p0);
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
assertEquals(2,
- getCleanStat(hoodieCleanStatsThree,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file1P0C0));
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file2P0C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
- file3P0C2));
+ assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0));
+ assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
// No cleaning on partially written file, with no commit.
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
file3P0C2); // update
+ testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2);
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
- file3P0C2));
+ assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
}
/**
* Test HoodieTable.clean() Cleaning by versions logic for MOR table with
Log files.
*/
@Test
- public void testKeepLatestFileVersionsMOR() throws IOException {
+ public void testKeepLatestFileVersionsMOR() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
@@ -638,35 +608,29 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf,
basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file
- String file1P0 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
- String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0,
Option.empty());
- HoodieTestUtils.createNewLogFile(fs, basePath,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0,
Option.of(2));
- // make 1 compaction commit
- HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
-
- // Make 4 files, one base file and 3 log files associated with base file
- HoodieTestUtils.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0);
- file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
- "001", file1P0, Option.of(3));
- // make 1 compaction commit
- HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
+ String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0);
+ testTable.forDeltaCommit("000")
+ .withLogFile(p0, file1P0, 1)
+ .withLogFile(p0, file1P0, 2);
+
+ // Make 2 files, one base file and 1 log files associated with base file
+ testTable.addDeltaCommit("001")
+ .withUpdates(p0, file1P0)
+ .withLogFile(p0, file1P0, 3);
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
assertEquals(3,
- getCleanStat(hoodieCleanStats,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
+ getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
.size(), "Must clean three files, one parquet and 2 log files");
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
- file1P0));
- assertFalse(HoodieTestUtils.doesLogFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
- file2P0L0, Option.empty()));
- assertFalse(HoodieTestUtils.doesLogFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
- file2P0L0, Option.of(2)));
+ assertFalse(testTable.fileExists(p0, "000", file1P0));
+ assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
+ assertTrue(testTable.fileExists(p0, "001", file1P0));
+ assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
}
@Test
@@ -835,43 +799,21 @@ public class TestCleaner extends HoodieClientTestBase {
}
}
- /**
- * Test HoodieTable.clean() Cleaning by commit logic for COW table.
- */
- @Test
- public void testKeepLatestCommits() throws IOException {
- testKeepLatestCommits(false, false, false);
- }
-
- /**
- * Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the
operations are simulated
- * such that first clean attempt failed after files were cleaned and a
subsequent cleanup succeeds.
- */
- @Test
- public void testKeepLatestCommitsWithFailureRetry() throws IOException {
- testKeepLatestCommits(true, false, false);
- }
-
- /**
- * Test HoodieTable.clean() Cleaning by commit logic for COW table.
- */
- @Test
- public void testKeepLatestCommitsIncrMode() throws IOException {
- testKeepLatestCommits(false, true, false);
- }
-
- /**
- * Test HoodieTable.clean() Cleaning by commit logic for COW table with
Bootstrap source file clean enable.
- */
- @Test
- public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws
IOException {
- testKeepLatestCommits(false, false, true);
+ private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
+ return Stream.of(
+ Arguments.of(false, false, false),
+ Arguments.of(true, false, false),
+ Arguments.of(false, true, false),
+ Arguments.of(false, false, true)
+ );
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for COW table.
*/
- private void testKeepLatestCommits(boolean simulateFailureRetry, boolean
enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException {
+ @ParameterizedTest
+ @MethodSource("argumentsForTestKeepLatestCommits")
+ public void testKeepLatestCommits(boolean simulateFailureRetry, boolean
enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
@@ -879,25 +821,23 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
- Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+ Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
// make 1 commit, with 1 file per partition
- HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001");
-
- String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
+ String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p0).get(0).getFileId()
: UUID.randomUUID().toString();
- String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
+ String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p1).get(0).getFileId()
: UUID.randomUUID().toString();
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
file1P0C0); // insert
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
file1P1C0); // insert
+ testTable.addInflightCommit("00000000000001").withUpdates(p0,
file1P0C0).withUpdates(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
- put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P0C0));
- put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P1C0));
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
})
);
@@ -909,29 +849,18 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config,
simulateFailureRetry);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
- file1P1C0));
+ assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 1 insert & 1 update per partition
- HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002");
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- String file2P0C1 =
- HoodieTestUtils
- .createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); //
insert
- String file2P1C1 =
- HoodieTestUtils
- .createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); //
insert
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
file1P0C0); // update
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
file1P1C0); // update
+ Map<String, String> partitionAndFileId002 =
testTable.addInflightCommit("00000000000002").withInserts(p0, p1);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ String file2P1C1 = partitionAndFileId002.get(p1);
+ testTable.forCommit("00000000000002").withUpdates(p0,
file1P0C0).withUpdates(p1, file1P1C0);
commitMetadata = generateCommitMetadata(new HashMap<String,
List<String>>() {
{
- put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
- put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
}
});
metaClient.getActiveTimeline().saveAsComplete(
@@ -939,28 +868,18 @@ public class TestCleaner extends HoodieClientTestBase {
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config,
simulateFailureRetry);
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file2P0C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
- file2P1C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
- file1P1C0));
+ assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
+ assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert
- HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003");
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file1P0C0); // update
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
file2P0C1); // update
- String file3P0C2 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
-
+ String file3P0C2 = testTable.addInflightCommit("00000000000003")
+ .withUpdates(p0, file1P0C0)
+ .withUpdates(p0, file2P0C1)
+ .withInserts(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils
-
.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+ .createImmutableMap(p0,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file3P0C2)));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"00000000000003"),
@@ -969,57 +888,41 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config,
simulateFailureRetry);
assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest
commit time to keep");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
+ assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert
- HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004");
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
file1P0C0); // update
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
file2P0C1); // update
- String file4P0C3 =
- HoodieTestUtils.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004");
+ String file4P0C3 = testTable.addInflightCommit("00000000000004")
+ .withUpdates(p0, file1P0C0)
+ .withUpdates(p0, file2P0C1)
+ .withInserts(p0).get(p0);
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
+ p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"00000000000004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config,
simulateFailureRetry);
// enableBootstrapSourceClean would delete the bootstrap base file as the
same time
- HoodieCleanStat partitionCleanStat =
- getCleanStat(hoodieCleanStatsFour,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour,
p0);
assertEquals(enableBootstrapSourceClean ? 2 : 1,
partitionCleanStat.getSuccessDeleteFiles().size()
+ (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()),
"Must clean at least one old file");
-
- assertFalse(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
- file1P0C0));
+ assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
+ assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0));
+ assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1));
+ assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
+ assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3));
if (enableBootstrapSourceClean) {
- assertFalse(new File(bootstrapMapping.get(
-
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
+ assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
+ p0).get(0).getBoostrapFileStatus().getPath().getUri())));
}
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file2P0C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
- file2P0C1));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
- file3P0C2));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
- file4P0C3));
// No cleaning on partially written file, with no commit.
- HoodieTestUtils
- .createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005",
file3P0C2); // update
- commitMetadata =
generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+ testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2);
+ commitMetadata =
generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION,
"00000000000005"));
@@ -1027,13 +930,11 @@ public class TestCleaner extends HoodieClientTestBase {
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION,
"00000000000005"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config,
simulateFailureRetry);
- HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- assertEquals(0,
- cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0,
"Must not clean any files");
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file1P0C0));
- assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
- file2P0C1));
+ HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0);
+ assertNull(cleanStat, "Must not clean any files");
+ assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
+ assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2));
}
/**
@@ -1041,7 +942,7 @@ public class TestCleaner extends HoodieClientTestBase {
* @return Partition to BootstrapFileMapping Map
* @throws IOException
*/
- private Map<String, List<BootstrapFileMapping>>
generateBootstrapIndexAndSourceData() throws IOException {
+ private Map<String, List<BootstrapFileMapping>>
generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
// create bootstrap source data path
java.nio.file.Path sourcePath = tempDir.resolve("data");
java.nio.file.Files.createDirectories(sourcePath);
@@ -1052,7 +953,7 @@ public class TestCleaner extends HoodieClientTestBase {
// generate bootstrap index
Map<String, List<BootstrapFileMapping>> bootstrapMapping =
TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
- new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1);
+ partitions, 1);
for (Map.Entry<String, List<BootstrapFileMapping>> entry :
bootstrapMapping.entrySet()) {
new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
@@ -1065,28 +966,29 @@ public class TestCleaner extends HoodieClientTestBase {
* Test Cleaning functionality of table.rollback() API.
*/
@Test
- public void testCleanMarkerDataFilesOnRollback() throws IOException {
- List<String> markerFiles = createMarkerFiles("000", 10);
- assertEquals(10, markerFiles.size(), "Some marker files are created.");
- assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files
are created.");
+ public void testCleanMarkerDataFilesOnRollback() throws Exception {
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient)
+ .addRequestedCommit("000")
+ .withMarkerFiles("default", 10, IOType.MERGE);
+ final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
+ assertEquals(10, numTempFilesBefore, "Some marker files are created.");
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
- table.getActiveTimeline().createNewInstant(new
HoodieInstant(State.REQUESTED,
- HoodieTimeline.COMMIT_ACTION, "000"));
table.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION,
"000"), Option.empty());
metaClient.reloadActiveTimeline();
table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, "000"), true);
- assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
+ final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size();
+ assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
}
/**
* Test CLeaner Stat when there are no partition paths.
*/
@Test
- public void testCleaningWithZeroPartitionPaths() throws IOException {
+ public void testCleaningWithZeroPartitionPaths() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
@@ -1095,7 +997,7 @@ public class TestCleaner extends HoodieClientTestBase {
// Make a commit, although there are no partitionPaths.
// Example use-case of this is when a client wants to create a table
// with just some commit metadata, but no data/partitionPaths.
- HoodieTestUtils.createCommitFiles(basePath, "000");
+ HoodieTestTable.of(metaClient).addCommit("000");
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -1294,33 +1196,6 @@ public class TestCleaner extends HoodieClientTestBase {
"Correct number of files under compaction deleted");
}
- /**
- * Utility method to create temporary data files.
- *
- * @param instantTime Commit Timestamp
- * @param numFiles Number for files to be generated
- * @return generated files
- * @throws IOException in case of error
- */
- private List<String> createMarkerFiles(String instantTime, int numFiles)
throws IOException {
- List<String> files = new ArrayList<>();
- for (int i = 0; i < numFiles; i++) {
- files.add(HoodieClientTestUtils.createNewMarkerFile(basePath,
"2019/03/29", instantTime));
- }
- return files;
- }
-
- /***
- * Helper method to return temporary files count.
- *
- * @return Number of temporary files found
- * @throws IOException in case of error
- */
- private int getTotalTempFiles() throws IOException {
- return FileSystemTestUtils.listRecursive(fs, new Path(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME))
- .size();
- }
-
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final
HoodieTableMetaClient metaClient,
List<String> paths) {
Predicate<String> roFilePredicate =
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index da4224a..1f638c3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
+import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.testutils.HoodieClientTestHarness;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("consistencyGuardType")
public void testCheckPassingAppearAndDisAppear(String consistencyGuardType)
throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f2");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f3");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3");
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
ConsistencyGuard passing =
consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingAppearFailSafe() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingAppearTimedWait() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingAppearsFailSafe() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillFileAppears(new Path(basePath +
"/partition/path/f1_1-0-2_000.parquet"));
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingAppearsTimedWait() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillFileAppears(new Path(basePath +
"/partition/path/f1_1-0-2_000.parquet"));
}
@Test
public void testCheckFailingDisappearFailSafe() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearTimedWait() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath
+ "/partition/path/f2_1-0-2_000.parquet"));
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearsFailSafe() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> {
passing.waitTillFileDisappears(new Path(basePath +
"/partition/path/f1_1-0-1_000.parquet"));
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends
HoodieClientTestHarness {
@Test
public void testCheckFailingDisappearsTimedWait() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
+ FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillFileDisappears(new Path(basePath +
"/partition/path/f1_1-0-1_000.parquet"));
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
index af679ce..55b7b50 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
@@ -18,17 +18,17 @@
package org.apache.hudi.table;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.IOType;
import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index d8bb946..f49d6d5 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -34,7 +35,6 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
@@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000
* 1024).build()).build();
- HoodieClientTestUtils.fakeCommit(basePath, "001");
- HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001",
"file1", fileSize);
+ FileCreateUtils.createCommit(basePath, "001");
+ FileCreateUtils.createDataFile(basePath, testPartitionPath, "001",
"file1", fileSize);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable)
HoodieTable.create(metaClient, config, hadoopConf);
@@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.insertSplitSize(totalInsertNum /
2).autoTuneInsertSplits(false).build()).build();
- HoodieClientTestUtils.fakeCommit(basePath, "001");
+ FileCreateUtils.createCommit(basePath, "001");
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable)
HoodieTable.create(metaClient, config, hadoopConf);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {testPartitionPath});
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 1529d79..c044bee 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -53,6 +53,9 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -166,7 +169,9 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1
log file written for every data file");
}
}
- HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
+ createDeltaCommit(basePath, newCommitTime);
+ createRequestedDeltaCommit(basePath, newCommitTime);
+ createInflightDeltaCommit(basePath, newCommitTime);
// Do a compaction
table = HoodieTable.create(config, hadoopConf);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index c6652ed..83e7ea0 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -20,16 +20,14 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.testutils.FileSystemTestUtils;
-import org.apache.hudi.io.IOType;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -55,38 +53,20 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
cleanupResources();
}
- private void givenCommit0(boolean isDeltaCommit) throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
- if (isDeltaCommit) {
- HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
- } else {
- HoodieClientTestUtils.fakeCommit(basePath, "000");
- }
- }
-
- private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
- HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1",
IOType.CREATE);
-
- HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3",
IOType.CREATE);
-
- if (isDeltaCommit) {
- HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
- HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2",
IOType.APPEND);
- HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4",
IOType.APPEND);
- HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
- } else {
- HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
- HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2",
IOType.MERGE);
- HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
- }
- }
-
@Test
public void testCopyOnWriteRollback() throws Exception {
// given: wrote some base files and corresponding markers
- givenCommit0(false);
- givenInflightCommit1(false);
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String f0 = testTable.addRequestedCommit("000")
+ .withInserts("partA").get("partA");
+ String f1 = testTable.addCommit("001")
+ .withUpdates("partA", f0)
+ .withInserts("partB").get("partB");
+ String f2 = "f2";
+ testTable.forCommit("001")
+ .withMarkerFile("partA", f0, IOType.MERGE)
+ .withMarkerFile("partB", f1, IOType.CREATE)
+ .withMarkerFile("partA", f2, IOType.CREATE);
// when
List<HoodieRollbackStat> stats = new
MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(),
hadoopConf), jsc, getConfig(), "002")
@@ -95,8 +75,8 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
// then: ensure files are deleted correctly, non-existent files reported
as failed deletes
assertEquals(2, stats.size());
- List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new
Path(basePath + "/partA"));
- List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new
Path(basePath + "/partB"));
+ List<FileStatus> partAFiles = testTable.listAllFiles("partA");
+ List<FileStatus> partBFiles = testTable.listAllFiles("partB");
assertEquals(0, partBFiles.size());
assertEquals(1, partAFiles.size());
@@ -107,8 +87,19 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
@Test
public void testMergeOnReadRollback() throws Exception {
// given: wrote some base + log files and corresponding markers
- givenCommit0(true);
- givenInflightCommit1(true);
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String f2 = testTable.addRequestedDeltaCommit("000")
+ .withInserts("partA").get("partA");
+ String f1 = testTable.addDeltaCommit("001")
+ .withLogFile("partA", f2)
+ .withInserts("partB").get("partB");
+ String f3 = "f3";
+ String f4 = "f4";
+ testTable.forDeltaCommit("001")
+ .withMarkerFile("partB", f1, IOType.CREATE)
+ .withMarkerFile("partA", f3, IOType.CREATE)
+ .withMarkerFile("partA", f2, IOType.APPEND)
+ .withMarkerFile("partB", f4, IOType.APPEND);
// when
List<HoodieRollbackStat> stats = new
MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(),
hadoopConf), jsc, getConfig(), "002")
@@ -117,12 +108,12 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
// then: ensure files are deleted, rollback block is appended (even if
append does not exist)
assertEquals(2, stats.size());
// will have the log file
- List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new
Path(basePath + "/partB"));
+ List<FileStatus> partBFiles = testTable.listAllFiles("partB");
assertEquals(1, partBFiles.size());
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
assertTrue(partBFiles.get(0).getLen() > 0);
- List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new
Path(basePath + "/partA"));
+ List<FileStatus> partAFiles = testTable.listAllFiles("partA");
assertEquals(3, partAFiles.size());
assertEquals(2, partAFiles.stream().filter(s ->
s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
assertEquals(1, partAFiles.stream().filter(s ->
s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f
-> f.getLen() > 0).count());
diff --git
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 4aaf585..353b34c 100644
---
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -38,7 +38,6 @@ import
org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.IOType;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
@@ -57,14 +56,11 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -74,57 +70,6 @@ import java.util.stream.Collectors;
public class HoodieClientTestUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieClientTestUtils.class);
- public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
-
- private static void fakeMetaFile(String basePath, String instantTime, String
suffix) throws IOException {
- String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME;
- new File(parentPath).mkdirs();
- new File(parentPath + "/" + instantTime + suffix).createNewFile();
- }
-
- public static void fakeCommit(String basePath, String instantTime) throws
IOException {
- fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
- }
-
- public static void fakeDeltaCommit(String basePath, String instantTime)
throws IOException {
- fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
- }
-
- public static void fakeInflightDeltaCommit(String basePath, String
instantTime) throws IOException {
- fakeMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
- }
-
- public static void fakeInFlightCommit(String basePath, String instantTime)
throws IOException {
- fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
- }
-
- public static void fakeDataFile(String basePath, String partitionPath,
String instantTime, String fileId)
- throws Exception {
- fakeDataFile(basePath, partitionPath, instantTime, fileId, 0);
- }
-
- public static void fakeDataFile(String basePath, String partitionPath,
String instantTime, String fileId, long length)
- throws Exception {
- String parentPath = String.format("%s/%s", basePath, partitionPath);
- new File(parentPath).mkdirs();
- String path = String.format("%s/%s", parentPath,
FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
- new File(path).createNewFile();
- new RandomAccessFile(path, "rw").setLength(length);
- }
-
- public static void fakeLogFile(String basePath, String partitionPath, String
baseInstantTime, String fileId, int version)
- throws Exception {
- fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
- }
-
- public static void fakeLogFile(String basePath, String partitionPath, String
baseInstantTime, String fileId, int version, int length)
- throws Exception {
- String parentPath = String.format("%s/%s", basePath, partitionPath);
- new File(parentPath).mkdirs();
- String path = String.format("%s/%s", parentPath,
FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(),
baseInstantTime, version, "1-0-1"));
- new File(path).createNewFile();
- new RandomAccessFile(path, "rw").setLength(length);
- }
/**
* Returns a Spark config for this test.
@@ -153,8 +98,8 @@ public class HoodieClientTestUtils {
return HoodieReadClient.addHoodieSupport(sparkConf);
}
- public static HashMap<String, String> getLatestFileIDsToFullPath(String
basePath, HoodieTimeline commitTimeline,
-
List<HoodieInstant> commitsToReturn) throws IOException {
+ private static HashMap<String, String> getLatestFileIDsToFullPath(String
basePath, HoodieTimeline commitTimeline,
+
List<HoodieInstant> commitsToReturn) throws IOException {
HashMap<String, String> fileIdToFullPath = new HashMap<>();
for (HoodieInstant commit : commitsToReturn) {
HoodieCommitMetadata metadata =
@@ -226,25 +171,8 @@ public class HoodieClientTestUtils {
}
/**
- * Find total basefiles for passed in paths.
+ * TODO Incorporate into {@link
org.apache.hudi.common.testutils.HoodieTestTable}.
*/
- public static Map<String, Integer> getBaseFileCountForPaths(String basePath,
FileSystem fs,
- String... paths) {
- Map<String, Integer> toReturn = new HashMap<>();
- try {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
- for (String path : paths) {
- BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
- metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new Path(path)));
- List<HoodieBaseFile> latestFiles =
fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
- toReturn.put(path, latestFiles.size());
- }
- return toReturn;
- } catch (Exception e) {
- throw new HoodieException("Error reading hoodie table as a dataframe",
e);
- }
- }
-
public static String writeParquetFile(String basePath, String partitionPath,
String filename,
List<HoodieRecord> records, Schema
schema, BloomFilter filter, boolean createCommitTime) throws IOException {
@@ -278,6 +206,9 @@ public class HoodieClientTestUtils {
return filename;
}
+ /**
+ * TODO Incorporate into {@link
org.apache.hudi.common.testutils.HoodieTestTable}.
+ */
public static String writeParquetFile(String basePath, String partitionPath,
List<HoodieRecord> records,
Schema schema, BloomFilter filter,
boolean createCommitTime) throws IOException, InterruptedException {
Thread.sleep(1000);
@@ -289,49 +220,4 @@ public class HoodieClientTestUtils {
createCommitTime);
}
- public static String createNewMarkerFile(String basePath, String
partitionPath, String instantTime)
- throws IOException {
- return createMarkerFile(basePath, partitionPath, instantTime);
- }
-
- public static String createMarkerFile(String basePath, String partitionPath,
String instantTime)
- throws IOException {
- return createMarkerFile(basePath, partitionPath, instantTime,
UUID.randomUUID().toString(), IOType.MERGE);
- }
-
- public static String createMarkerFile(String basePath, String partitionPath,
String instantTime, String fileID, IOType ioType)
- throws IOException {
- String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME
+ "/" + instantTime + "/" + partitionPath + "/";
- new File(folderPath).mkdirs();
- String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID,
DEFAULT_WRITE_TOKEN, instantTime,
- HoodieFileFormat.PARQUET.getFileExtension(),
HoodieTableMetaClient.MARKER_EXTN, ioType);
- File f = new File(folderPath + markerFileName);
- f.createNewFile();
- return f.getAbsolutePath();
- }
-
- public static void createMarkerFile(String basePath, String partitionPath,
String instantTime, String dataFileName) throws IOException {
- createTempFolderForMarkerFiles(basePath);
- String folderPath = getTempFolderName(basePath);
- // create dir for this instant
- new File(folderPath + "/" + instantTime + "/" + partitionPath).mkdirs();
- new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" +
dataFileName + ".marker.MERGE").createNewFile();
- }
-
- public static int getTotalMarkerFileCount(String basePath, String
partitionPath, String instantTime) {
- String folderPath = getTempFolderName(basePath);
- File markerDir = new File(folderPath + "/" + instantTime + "/" +
partitionPath);
- if (markerDir.exists()) {
- return markerDir.listFiles((dir, name) ->
name.contains(".marker.MERGE")).length;
- }
- return 0;
- }
-
- public static void createTempFolderForMarkerFiles(String basePath) {
- new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs();
- }
-
- public static String getTempFolderName(String basePath) {
- return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME;
- }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java
similarity index 57%
rename from hudi-client/src/main/java/org/apache/hudi/io/IOType.java
rename to hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java
index aa6660e..bd29ff0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.io;
+package org.apache.hudi.common.model;
/**
* Types of lower level I/O operations done on each file slice.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
new file mode 100644
index 0000000..987b567
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FileCreateUtils {
+
+ private static void createMetaFile(String basePath, String instantTime,
String suffix) throws IOException {
+ Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.METAFOLDER_NAME);
+ Files.createDirectories(parentPath);
+ Path metaFilePath = parentPath.resolve(instantTime + suffix);
+ if (Files.notExists(metaFilePath)) {
+ Files.createFile(metaFilePath);
+ }
+ }
+
+ public static void createCommit(String basePath, String instantTime) throws
IOException {
+ createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
+ }
+
+ public static void createRequestedCommit(String basePath, String
instantTime) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_COMMIT_EXTENSION);
+ }
+
+ public static void createInflightCommit(String basePath, String instantTime)
throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
+ }
+
+ public static void createDeltaCommit(String basePath, String instantTime)
throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.DELTA_COMMIT_EXTENSION);
+ }
+
+ public static void createRequestedDeltaCommit(String basePath, String
instantTime) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+ }
+
+ public static void createInflightDeltaCommit(String basePath, String
instantTime) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+ }
+
+ public static void createDataFile(String basePath, String partitionPath,
String instantTime, String fileId)
+ throws Exception {
+ createDataFile(basePath, partitionPath, instantTime, fileId, 0);
+ }
+
+ public static void createDataFile(String basePath, String partitionPath,
String instantTime, String fileId, long length)
+ throws Exception {
+ Path parentPath = Paths.get(basePath, partitionPath);
+ Files.createDirectories(parentPath);
+ Path dataFilePath =
parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
+ if (Files.notExists(dataFilePath)) {
+ Files.createFile(dataFilePath);
+ }
+ new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
+ }
+
+ public static void createLogFile(String basePath, String partitionPath,
String baseInstantTime, String fileId, int version)
+ throws Exception {
+ createLogFile(basePath, partitionPath, baseInstantTime, fileId, version,
0);
+ }
+
+ public static void createLogFile(String basePath, String partitionPath,
String baseInstantTime, String fileId, int version, int length)
+ throws Exception {
+ Path parentPath = Paths.get(basePath, partitionPath);
+ Files.createDirectories(parentPath);
+ Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version,
"1-0-1"));
+ if (Files.notExists(logFilePath)) {
+ Files.createFile(logFilePath);
+ }
+ new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
+ }
+
+ public static String createMarkerFile(String basePath, String partitionPath,
String instantTime, String fileID, IOType ioType)
+ throws IOException {
+ Path folderPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+ Files.createDirectories(folderPath);
+ String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1",
instantTime,
+ HoodieFileFormat.PARQUET.getFileExtension(),
HoodieTableMetaClient.MARKER_EXTN, ioType);
+ Path markerFilePath = folderPath.resolve(markerFileName);
+ if (Files.notExists(markerFilePath)) {
+ Files.createFile(markerFilePath);
+ }
+ return markerFilePath.toAbsolutePath().toString();
+ }
+
+ public static long getTotalMarkerFileCount(String basePath, String
partitionPath, String instantTime, IOType ioType) throws IOException {
+ Path markerDir = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+ if (Files.notExists(markerDir)) {
+ return 0;
+ }
+ return Files.list(markerDir).filter(p -> p.getFileName().toString()
+ .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN,
ioType))).count();
+ }
+
+ /**
+ * Find total basefiles for passed in paths.
+ */
+ public static Map<String, Long> getBaseFileCountsForPaths(String basePath,
FileSystem fs, String... paths) {
+ Map<String, Long> toReturn = new HashMap<>();
+ try {
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
+ for (String path : paths) {
+ TableFileSystemView.BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new org.apache.hadoop.fs.Path(path)));
+ toReturn.put(path, fileSystemView.getLatestBaseFiles().count());
+ }
+ return toReturn;
+ } catch (Exception e) {
+ throw new HoodieException("Error reading hoodie table as a dataframe",
e);
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
new file mode 100644
index 0000000..32f2d45
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
+import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
+
+public class HoodieTestTable {
+
+ private final String basePath;
+ private final FileSystem fs;
+ private HoodieTableMetaClient metaClient;
+ private String currentInstantTime;
+
+ private HoodieTestTable(String basePath, FileSystem fs,
HoodieTableMetaClient metaClient) {
+ ValidationUtils.checkArgument(Objects.equals(basePath,
metaClient.getBasePath()));
+ ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
+ this.basePath = basePath;
+ this.fs = fs;
+ this.metaClient = metaClient;
+ }
+
+ public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
+ return new HoodieTestTable(metaClient.getBasePath(),
metaClient.getRawFs(), metaClient);
+ }
+
+ public HoodieTestTable addRequestedCommit(String instantTime) throws
Exception {
+ createRequestedCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws
Exception {
+ createRequestedDeltaCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable addInflightCommit(String instantTime) throws
Exception {
+ createRequestedCommit(basePath, instantTime);
+ createInflightCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable addInflightDeltaCommit(String instantTime) throws
Exception {
+ createRequestedDeltaCommit(basePath, instantTime);
+ createInflightDeltaCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable addCommit(String instantTime) throws Exception {
+ createRequestedCommit(basePath, instantTime);
+ createInflightCommit(basePath, instantTime);
+ createCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
+ createRequestedDeltaCommit(basePath, instantTime);
+ createInflightDeltaCommit(basePath, instantTime);
+ createDeltaCommit(basePath, instantTime);
+ currentInstantTime = instantTime;
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return this;
+ }
+
+ public HoodieTestTable forCommit(String instantTime) {
+ currentInstantTime = instantTime;
+ return this;
+ }
+
+ public HoodieTestTable forDeltaCommit(String instantTime) {
+ currentInstantTime = instantTime;
+ return this;
+ }
+
+ public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType)
throws IOException {
+ return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
+ }
+
+ public HoodieTestTable withMarkerFile(String partitionPath, String fileId,
IOType ioType) throws IOException {
+ createMarkerFile(basePath, partitionPath, currentInstantTime, fileId,
ioType);
+ return this;
+ }
+
+ public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType
ioType) throws IOException {
+ String[] fileIds = IntStream.range(0, num).mapToObj(i ->
UUID.randomUUID().toString()).toArray(String[]::new);
+ return withMarkerFiles(partitionPath, fileIds, ioType);
+ }
+
+ public HoodieTestTable withMarkerFiles(String partitionPath, String[]
fileIds, IOType ioType) throws IOException {
+ for (String fileId : fileIds) {
+ createMarkerFile(basePath, partitionPath, currentInstantTime, fileId,
ioType);
+ }
+ return this;
+ }
+
+ /**
+ * Insert one base file to each of the given distinct partitions.
+ *
+ * @return A {@link Map} of partition and its newly inserted file's id.
+ */
+ public Map<String, String> withInserts(String... partitions) throws
Exception {
+ Map<String, String> partitionFileIdMap = new HashMap<>();
+ for (String p : partitions) {
+ String fileId = UUID.randomUUID().toString();
+ FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
+ partitionFileIdMap.put(p, fileId);
+ }
+ return partitionFileIdMap;
+ }
+
+ public HoodieTestTable withUpdates(String partition, String... fileIds)
throws Exception {
+ for (String f : fileIds) {
+ FileCreateUtils.createDataFile(basePath, partition, currentInstantTime,
f);
+ }
+ return this;
+ }
+
+ public String withLogFile(String partitionPath) throws Exception {
+ String fileId = UUID.randomUUID().toString();
+ withLogFile(partitionPath, fileId);
+ return fileId;
+ }
+
+ public HoodieTestTable withLogFile(String partitionPath, String fileId)
throws Exception {
+ return withLogFile(partitionPath, fileId, 0);
+ }
+
+ public HoodieTestTable withLogFile(String partitionPath, String fileId, int
version) throws Exception {
+ FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime,
fileId, version);
+ return this;
+ }
+
+ public boolean filesExist(Map<String, String> partitionAndFileId, String
instantTime) {
+ return partitionAndFileId.entrySet().stream().allMatch(entry -> {
+ String partition = entry.getKey();
+ String fileId = entry.getValue();
+ return fileExists(partition, instantTime, fileId);
+ });
+ }
+
+ public boolean filesExist(String partition, String instantTime, String...
fileIds) {
+ return Arrays.stream(fileIds).allMatch(f -> fileExists(partition,
instantTime, f));
+ }
+
+ public boolean fileExists(String partition, String instantTime, String
fileId) {
+ try {
+ return fs.exists(new Path(Paths.get(basePath, partition,
+ FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
+ } catch (IOException e) {
+ throw new HoodieTestTableException(e);
+ }
+ }
+
+ public boolean logFilesExist(String partition, String instantTime, String
fileId, int... versions) {
+ return Arrays.stream(versions).allMatch(v -> logFileExists(partition,
instantTime, fileId, v));
+ }
+
+ public boolean logFileExists(String partition, String instantTime, String
fileId, int version) {
+ try {
+ return fs.exists(new Path(Paths.get(basePath, partition,
+ FSUtils.makeLogFileName(fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version,
"1-0-1")).toString()));
+ } catch (IOException e) {
+ throw new HoodieTestTableException(e);
+ }
+ }
+
+ public List<FileStatus> listAllFiles(String partitionPath) throws
IOException {
+ return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath,
partitionPath).toString()));
+ }
+
+ public List<FileStatus> listAllFilesInTempFolder() throws IOException {
+ return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME).toString()));
+ }
+
+ public static class HoodieTestTableException extends RuntimeException {
+ public HoodieTestTableException(Throwable t) {
+ super(t);
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index c88ea51..783bda4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -100,7 +100,6 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
public class HoodieTestUtils {
- public static final String TEST_EXTENSION = ".test";
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
public static final int DEFAULT_LOG_VERSION = 1;
@@ -167,6 +166,9 @@ public class HoodieTestUtils {
return COMMIT_FORMATTER.format(new Date());
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static void createCommitFiles(String basePath, String...
instantTimes) throws IOException {
for (String instantTime : instantTimes) {
new File(
@@ -177,20 +179,6 @@ public class HoodieTestUtils {
+
HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile();
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
HoodieTimeline.makeCommitFileName(instantTime))
- .createNewFile();
- }
- }
-
- public static void createDeltaCommitFiles(String basePath, String...
instantTimes) throws IOException {
- for (String instantTime : instantTimes) {
- new File(
- basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
- +
HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
- new File(
- basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
- +
HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
- new File(
- basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
HoodieTimeline.makeDeltaFileName(instantTime))
.createNewFile();
}
}
@@ -199,6 +187,9 @@ public class HoodieTestUtils {
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static void createInflightCommitFiles(String basePath, String...
instantTimes) throws IOException {
for (String instantTime : instantTimes) {
@@ -212,11 +203,12 @@ public class HoodieTestUtils {
public static void createPendingCleanFiles(HoodieTableMetaClient metaClient,
String... instantTimes) {
for (String instantTime : instantTimes) {
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
- HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f
-> {
+ HoodieTimeline.makeInflightCleanerFileName(instantTime))
+ .forEach(f -> {
FSDataOutputStream os = null;
try {
- Path commitFile = new Path(
- metaClient.getBasePath() + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
+ Path commitFile = new Path(Paths
+ .get(metaClient.getBasePath(),
HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanerPlan(
@@ -239,11 +231,12 @@ public class HoodieTestUtils {
public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient
metaClient, String commitTime) {
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
- HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
+ HoodieTimeline.makeInflightCleanerFileName(commitTime))
+ .forEach(f -> {
FSDataOutputStream os = null;
try {
- Path commitFile = new Path(
- metaClient.getBasePath() + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
+ Path commitFile = new Path(Paths
+ .get(metaClient.getBasePath(),
HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata
os.write(new byte[0]);
@@ -261,18 +254,18 @@ public class HoodieTestUtils {
});
}
- public static String createNewDataFile(String basePath, String
partitionPath, String instantTime)
- throws IOException {
- String fileID = UUID.randomUUID().toString();
- return createDataFile(basePath, partitionPath, instantTime, fileID);
- }
-
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String createNewDataFile(String basePath, String
partitionPath, String instantTime, long length)
throws IOException {
String fileID = UUID.randomUUID().toString();
return createDataFileFixLength(basePath, partitionPath, instantTime,
fileID, length);
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String createDataFile(String basePath, String partitionPath,
String instantTime, String fileID)
throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
@@ -281,7 +274,7 @@ public class HoodieTestUtils {
return fileID;
}
- public static String createDataFileFixLength(String basePath, String
partitionPath, String instantTime, String fileID,
+ private static String createDataFileFixLength(String basePath, String
partitionPath, String instantTime, String fileID,
long length) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
Files.createDirectories(Paths.get(folderPath));
@@ -293,6 +286,9 @@ public class HoodieTestUtils {
return fileID;
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String createNewLogFile(FileSystem fs, String basePath, String
partitionPath, String instantTime,
String fileID, Option<Integer> version) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
@@ -309,17 +305,6 @@ public class HoodieTestUtils {
return fileID;
}
- public static void createCompactionCommitFiles(FileSystem fs, String
basePath, String... instantTimes)
- throws IOException {
- for (String instantTime : instantTimes) {
- boolean createFile = fs.createNewFile(new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
- + HoodieTimeline.makeCommitFileName(instantTime)));
- if (!createFile) {
- throw new IOException("cannot create commit file for commit " +
instantTime);
- }
- }
- }
-
public static void createCompactionRequest(HoodieTableMetaClient metaClient,
String instant,
List<Pair<String, FileSlice>> fileSliceList) throws IOException {
HoodieCompactionPlan plan =
CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(),
Option.empty());
@@ -328,10 +313,16 @@ public class HoodieTestUtils {
TimelineMetadataUtils.serializeCompactionPlan(plan));
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String getDataFilePath(String basePath, String partitionPath,
String instantTime, String fileID) {
return basePath + "/" + partitionPath + "/" +
FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID);
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String getLogFilePath(String basePath, String partitionPath,
String instantTime, String fileID,
Option<Integer> version) {
return basePath + "/" + partitionPath + "/" +
FSUtils.makeLogFileName(fileID, ".log", instantTime,
@@ -342,32 +333,39 @@ public class HoodieTestUtils {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
instantTime + HoodieTimeline.COMMIT_EXTENSION;
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String getInflightCommitFilePath(String basePath, String
instantTime) {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
instantTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static String getRequestedCompactionFilePath(String basePath, String
instantTime) {
return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" +
instantTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static boolean doesDataFileExist(String basePath, String
partitionPath, String instantTime,
String fileID) {
return new File(getDataFilePath(basePath, partitionPath, instantTime,
fileID)).exists();
}
- public static boolean doesLogFileExist(String basePath, String
partitionPath, String instantTime, String fileID,
- Option<Integer> version) {
- return new File(getLogFilePath(basePath, partitionPath, instantTime,
fileID, version)).exists();
- }
-
public static boolean doesCommitExist(String basePath, String instantTime) {
return new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
instantTime + HoodieTimeline.COMMIT_EXTENSION)
.exists();
}
+ /**
+ * @deprecated Use {@link HoodieTestTable} instead.
+ */
public static boolean doesInflightExist(String basePath, String instantTime)
{
return new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" +
instantTime + HoodieTimeline.INFLIGHT_EXTENSION)