xushiyan commented on code in PR #7251:
URL: https://github.com/apache/hudi/pull/7251#discussion_r1027284761
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java:
##########
@@ -228,32 +212,63 @@ public void testInsertAndCleanFailedWritesByVersions()
throws Exception {
}
/**
- * Test Clean-By-Versions using prepped versions of insert/upsert API.
+ * Test Helper for cleaning failed writes by versions logic from
HoodieWriteClient API perspective.
+ *
+ * @param insertFn Insert API to be tested
+ * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
+ * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
+ * @throws Exception in case of errors
*/
- @Test
- public void testInsertPreppedAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords,
SparkRDDWriteClient::upsertPreppedRecords,
- true);
- }
+ private void testInsertAndCleanFailedWritesByVersions(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
+ throws Exception {
+ int maxVersions = 3; // keep upto 3 versions for each file
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withAutoCommit(false)
+ .withHeartbeatIntervalInMs(3000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
- /**
- * Test Clean-By-Versions using bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertAndCleanByVersions() throws Exception {
Review Comment:
this moved to `TestCleanerInsertAndCleanByVersions`
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java:
##########
@@ -228,32 +212,63 @@ public void testInsertAndCleanFailedWritesByVersions()
throws Exception {
}
/**
- * Test Clean-By-Versions using prepped versions of insert/upsert API.
+ * Test Helper for cleaning failed writes by versions logic from
HoodieWriteClient API perspective.
+ *
+ * @param insertFn Insert API to be tested
+ * @param isPreppedAPI Flag to indicate if a prepped-version is used. If
true, a wrapper function will be used during
+ * record generation to also tag the regards (de-dupe is
implicit as we use unique record-gen APIs)
+ * @throws Exception in case of errors
*/
- @Test
- public void testInsertPreppedAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords,
SparkRDDWriteClient::upsertPreppedRecords,
- true);
- }
+ private void testInsertAndCleanFailedWritesByVersions(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
+ throws Exception {
+ int maxVersions = 3; // keep upto 3 versions for each file
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withAutoCommit(false)
+ .withHeartbeatIntervalInMs(3000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+ .withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
- /**
- * Test Clean-By-Versions using bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertAndCleanByVersions() throws Exception {
- testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert,
SparkRDDWriteClient::upsert, false);
- }
+ final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction =
+ generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
- /**
- * Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
Review Comment:
this moved to `TestCleanerInsertAndCleanByVersions`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]