This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 40d534e878bb8ceb4bc7fa20539cca3debd5727e Author: Danny Chan <[email protected]> AuthorDate: Thu Feb 2 14:49:53 2023 +0800 [HUDI-5647] Automate savepoint and restore tests (#7796) --- .../TestSavepointRestoreCopyOnWrite.java | 173 ++++++++++++++ .../TestSavepointRestoreMergeOnRead.java | 248 +++++++++++++++++++++ .../hudi/testutils/HoodieClientTestBase.java | 62 +++++- 3 files changed, 482 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java new file mode 100644 index 00000000000..8a71a01fda9 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for COPY_ON_WRITE table savepoint restore. + */ +@Tag("functional") +public class TestSavepointRestoreCopyOnWrite extends HoodieClientTestBase { + + /** + * Actions: C1, C2, savepoint C2, C3, C4, restore. + * Should go back to C2, + * C3 and C4 should be cleaned up. + */ + @Test + void testBasicRollback() throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + String prevInstant = HoodieTimeline.INIT_INSTANT_TS; + final int numRecords = 10; + for (int i = 1; i <= 4; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2nd commit been rolled back + insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords * i, 1, Option.empty()); + prevInstant = newCommitTime; + if (i == 2) { + // trigger savepoint + savepointCommit = newCommitTime; + client.savepoint("user1", "Savepoint for 2nd commit"); + } + } + assertRowNumberEqualsTo(40); + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(20); + } + } + + /** + * The restore should roll back all the pending instants that are beyond the savepoint. + * + * <p>Actions: C1, C2, savepoint C2, C3, C4 inflight, restore. + * Should go back to C2, + * C3, C4 should be cleaned up. + */ + @Test + void testCleaningPendingInstants() throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + String prevInstant = HoodieTimeline.INIT_INSTANT_TS; + final int numRecords = 10; + for (int i = 1; i <= 3; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2nd commit been rolled back + insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords * i, 1, Option.empty()); + prevInstant = newCommitTime; + if (i == 2) { + // trigger savepoint + savepointCommit = newCommitTime; + client.savepoint("user1", "Savepoint for 2nd commit"); + } + } + assertRowNumberEqualsTo(30); + // write another pending instant + insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords); + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(20); + } + } + + /** + * The rollbacks(either inflight or complete) beyond the savepoint should be cleaned. + * + * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore. + * Should go back to C2. + * C3, C4(RB_C3), C5 should be cleaned up. + * + * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore. + * Should go back to C2. + * C3, C4 (RB_C3) should be cleaned up. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCleaningRollbackInstants(boolean commitRollback) throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) + // eager cleaning + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + String prevInstant = HoodieTimeline.INIT_INSTANT_TS; + final int numRecords = 10; + for (int i = 1; i <= 2; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2nd commit been rolled back + insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords * i, 1, Option.empty()); + prevInstant = newCommitTime; + if (i == 2) { + // trigger savepoint + savepointCommit = newCommitTime; + client.savepoint("user1", "Savepoint for 2nd commit"); + } + } + assertRowNumberEqualsTo(20); + // write another pending instant + insertBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), numRecords); + // rollback the pending instant + if (commitRollback) { + client.rollbackFailedWrites(); + } else { + HoodieInstant pendingInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction() + .lastInstant().orElseThrow(() -> new HoodieException("Pending instant does not exist")); + HoodieSparkTable.create(client.getConfig(), context) + .scheduleRollback(context, HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true); + } + Option<String> rollbackInstant = metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp); + assertTrue(rollbackInstant.isPresent(), "The latest instant should be a rollback"); + // write another batch + insertBatch(hoodieWriteConfig, client, HoodieActiveTimeline.createNewInstantTime(), rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert, + false, true, numRecords, numRecords * 3, 1, Option.empty()); + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(20); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java new file mode 100644 index 00000000000..6c1dfe5d734 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for MERGE_ON_READ table savepoint restore. + */ +@Tag("functional") +public class TestSavepointRestoreMergeOnRead extends HoodieClientTestBase { + + /** + * Actions: DC1, DC2, DC3, savepoint DC3,(snapshot query) DC4, C5, DC6, DC7. restore to DC3. + * Should roll back DC5 and DC6. + * The latest file slice should be fully cleaned up, and rollback log appends for DC4 in first file slice. + * + * <p>For example file layout, + * FG1: + * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4) + * BF5(C5), LF1(DC6), LF2(DC7) + * After restore, it becomes + * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4) + * + * <p>Expected behaviors: + * snapshot query: total rec matches. + * checking the row count by updating columns in (val4,val5,val6, val7). + */ + @Test + void testCleaningDeltaCommits() throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction + .withInlineCompaction(true) + .build()) + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + final int numRecords = 10; + List<HoodieRecord> baseRecordsToUpdate = null; + for (int i = 1; i <= 3; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2td commit been rolled back + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.insert(writeRecords, newCommitTime); + if (i == 3) { + // trigger savepoint + savepointCommit = newCommitTime; + baseRecordsToUpdate = records; + client.savepoint("user1", "Savepoint for 3rd commit"); + } + } + + assertRowNumberEqualsTo(30); + + // write another 3 delta commits + for (int i = 1; i <= 3; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null")); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + if (i == 1) { + Option<String> compactionInstant = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled"); + client.compact(compactionInstant.get()); + } + } + + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(30); + } + } + + /** + * <p>Actions: DC1, DC2, DC3, savepoint DC3, DC4, C5.pending, DC6, DC7, restore + * should roll back until DC3. + * + * <P>Expected behaviors: pending compaction after savepoint should also be cleaned, + * the latest file slice should be fully delete, for DC4 a rollback log append should be made. + */ + @Test + void testCleaningPendingCompaction() throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit triggers compaction + .withInlineCompaction(false) + .withScheduleInlineCompaction(true) + .build()) + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + final int numRecords = 10; + List<HoodieRecord> baseRecordsToUpdate = null; + for (int i = 1; i <= 3; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2td commit been rolled back + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.insert(writeRecords, newCommitTime); + if (i == 3) { + // trigger savepoint + savepointCommit = newCommitTime; + baseRecordsToUpdate = records; + client.savepoint("user1", "Savepoint for 3rd commit"); + } + } + + assertRowNumberEqualsTo(30); + + // write another 3 delta commits + for (int i = 1; i <= 3; i++) { + upsertBatch(writeClient, baseRecordsToUpdate); + if (i == 1) { + Option<String> compactionInstant = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled"); + compactWithoutCommit(compactionInstant.get()); + } + } + + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(30); + } + } + + /** + * Actions: DC1, DC2, DC3, C4, savepoint C4, DC5, C6(RB_DC5), DC7, restore + * + * <P>Expected behaviors: should roll back DC5, C6 and DC6. + * No files will be cleaned up. Only rollback log appends. + */ + @Test + void testCleaningCompletedRollback() throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(3) // the 3rd delta_commit triggers compaction + .withInlineCompaction(false) + .withScheduleInlineCompaction(true) + .build()) + .withRollbackUsingMarkers(true) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + String savepointCommit = null; + final int numRecords = 10; + List<HoodieRecord> baseRecordsToUpdate = null; + for (int i = 1; i <= 2; i++) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + // Write 4 inserts with the 2td commit been rolled back + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.insert(writeRecords, newCommitTime); + if (i == 2) { + baseRecordsToUpdate = records; + } + } + + // update to generate log files, then a valid compaction plan can be scheduled + upsertBatch(client, baseRecordsToUpdate); + Option<String> compactionInstant = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstant.isPresent(), "A compaction plan should be scheduled"); + client.compact(compactionInstant.get()); + savepointCommit = compactionInstant.get(); + client.savepoint("user1", "Savepoint for 3td commit"); + + assertRowNumberEqualsTo(20); + // write a delta_commit but does not commit + updateBatchWithoutCommit(HoodieActiveTimeline.createNewInstantTime(), Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null")); + // rollback the delta_commit + assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit should be rolled back"); + + // another update + upsertBatch(writeClient, baseRecordsToUpdate); + + // restore + client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, "restore commit should not be null")); + assertRowNumberEqualsTo(20); + } + } + + private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord> baseRecordsToUpdate) throws IOException { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, Objects.requireNonNull(baseRecordsToUpdate, "The records to update should not be null")); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + } + + private void compactWithoutCommit(String compactionInstantTime) { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoCommit(false) // disable auto commit + .withRollbackUsingMarkers(true) + .build(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + JavaRDD<WriteStatus> statuses = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime).getWriteStatuses(); + assertNoWriteErrors(statuses.collect()); + } + } + + @Override + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 2b4172781c6..43f843a4f33 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -597,7 +597,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = metaClient.getCommitsTimeline(); if (assertForCommit) { assertEquals(expTotalCommits, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), @@ -700,6 +700,66 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { return result; } + /** + * Insert a batch of records without commit(so that the instant is in-flight). + * + * @param newCommitTime The commit time + * @param numRecords The number of records to insert + */ + @SuppressWarnings("rawtypes, unchecked") + protected void insertBatchWithoutCommit(String newCommitTime, int numRecords) { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoCommit(false) // disable auto commit + .withRollbackUsingMarkers(true) + .build(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + client.startCommitWithTime(newCommitTime); + + List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + + List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + } + } + + /** + * Update a batch of records without commit(so that the instant is in-flight). + * + * @param newCommitTime The commit time + * @param baseRecordsToUpdate The base records to update + */ + @SuppressWarnings("rawtypes, unchecked") + protected void updateBatchWithoutCommit(String newCommitTime, List<HoodieRecord> baseRecordsToUpdate) throws IOException { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoCommit(false) // disable auto commit + .withRollbackUsingMarkers(true) + .build(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) { + client.startCommitWithTime(newCommitTime); + List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, baseRecordsToUpdate); + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + } + } + + /** + * Asserts the row number in data generator equals to {@code numRows}. + * + * @param numRows The expected row number + */ + protected void assertRowNumberEqualsTo(int numRows) { + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals(numRows, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), + "Must contain " + numRows + " records"); + } + /** * Get Cleaner state corresponding to a partition path. *
