codope commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713737777
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -264,344 +156,109 @@ public void testOnlyValidPartitionsAdded() throws
Exception {
final String filteredDirectoryThree = ".backups";
// Create some commits
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne,
filteredDirectoryTwo, filteredDirectoryThree)
.addCommit("001").withBaseFilesInPartition("p1",
10).withBaseFilesInPartition("p2", 10, 10)
.addCommit("002").withBaseFilesInPartition("p1",
10).withBaseFilesInPartition("p2", 10, 10, 10);
- final HoodieWriteConfig writeConfig =
- getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER,
true, true, false)
+ writeConfig =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
- client.startCommitWithTime("005");
- client.insert(jsc.emptyRDD(), "005");
-
- List<String> partitions =
metadataWriter(client).metadata().getAllPartitionPaths();
- assertFalse(partitions.contains(nonPartitionDirectory),
- "Must not contain the non-partition " + nonPartitionDirectory);
- assertTrue(partitions.contains("p1"), "Must contain partition p1");
- assertTrue(partitions.contains("p2"), "Must contain partition p2");
-
- assertFalse(partitions.contains(filteredDirectoryOne),
- "Must not contain the filtered directory " + filteredDirectoryOne);
- assertFalse(partitions.contains(filteredDirectoryTwo),
- "Must not contain the filtered directory " + filteredDirectoryTwo);
- assertFalse(partitions.contains(filteredDirectoryThree),
- "Must not contain the filtered directory " + filteredDirectoryThree);
-
- FileStatus[] statuses = metadata(client).getAllFilesInPartition(new
Path(basePath, "p1"));
- assertEquals(2, statuses.length);
- statuses = metadata(client).getAllFilesInPartition(new Path(basePath,
"p2"));
- assertEquals(5, statuses.length);
- Map<String, FileStatus[]> partitionsToFilesMap =
metadata(client).getAllFilesInPartitions(
- Arrays.asList(basePath + "/p1", basePath + "/p2"));
- assertEquals(2, partitionsToFilesMap.size());
- assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
- assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
- }
+ testTable.doWriteOperation("003", WriteOperationType.UPSERT,
Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
+ syncTableMetadata(writeConfig);
+
+ List<String> partitions =
metadataWriter(writeConfig).metadata().getAllPartitionPaths();
+ assertFalse(partitions.contains(nonPartitionDirectory),
+ "Must not contain the non-partition " + nonPartitionDirectory);
+ assertTrue(partitions.contains("p1"), "Must contain partition p1");
+ assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+ assertFalse(partitions.contains(filteredDirectoryOne),
+ "Must not contain the filtered directory " + filteredDirectoryOne);
+ assertFalse(partitions.contains(filteredDirectoryTwo),
+ "Must not contain the filtered directory " + filteredDirectoryTwo);
+ assertFalse(partitions.contains(filteredDirectoryThree),
+ "Must not contain the filtered directory " + filteredDirectoryThree);
+
+ FileStatus[] statuses = metadata(writeConfig,
context).getAllFilesInPartition(new Path(basePath, "p1"));
+ assertEquals(3, statuses.length);
+ statuses = metadata(writeConfig, context).getAllFilesInPartition(new
Path(basePath, "p2"));
+ assertEquals(6, statuses.length);
+ Map<String, FileStatus[]> partitionsToFilesMap = metadata(writeConfig,
context).getAllFilesInPartitions(
+ Arrays.asList(basePath + "/p1", basePath + "/p2"));
+ assertEquals(2, partitionsToFilesMap.size());
+ assertEquals(3, partitionsToFilesMap.get(basePath + "/p1").length);
+ assertEquals(6, partitionsToFilesMap.get(basePath + "/p2").length);
}
/**
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testTableOperations(HoodieTableType tableType) throws Exception {
+ @MethodSource("bootstrapAndTableOperationTestArgs")
+ public void testTableOperations(HoodieTableType tableType, boolean
doNotSyncFewCommits) throws Exception {
init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
-
- // Write 1 (Bulk insert)
- String newCommitTime = "001";
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- client.startCommitWithTime(newCommitTime);
- List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
+ // bootstrap w/ 2 commits
+ bootstrapMetadata(testTable);
- // Write 2 (inserts)
- newCommitTime = "002";
- client.startCommitWithTime(newCommitTime);
- validateMetadata(client);
-
- records = dataGen.generateInserts(newCommitTime, 20);
- writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Write 3 (updates)
- newCommitTime = "003";
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Write 4 (updates and inserts)
- newCommitTime = "004";
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
+ // trigger an upsert
+ testTable.doWriteOperation("003", WriteOperationType.UPSERT,
Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+ syncAndValidate(testTable);
- // Compaction
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
- newCommitTime = "005";
- client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
- client.compact(newCommitTime);
- validateMetadata(client);
- }
-
- // Write 5 (updates and inserts)
- newCommitTime = "006";
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, 5);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Compaction
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
- newCommitTime = "007";
- client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
- client.compact(newCommitTime);
- validateMetadata(client);
- }
-
- // Deletes
- newCommitTime = "008";
- records = dataGen.generateDeletes(newCommitTime, 10);
- JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
- client.startCommitWithTime(newCommitTime);
- client.delete(deleteKeys, newCommitTime);
- validateMetadata(client);
-
- // Clean
- newCommitTime = "009";
- client.clean(newCommitTime);
- validateMetadata(client);
-
- // Restore
- client.restoreToInstant("006");
- validateMetadata(client);
+ // trigger compaction
+ if (MERGE_ON_READ.equals(tableType)) {
+ testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+ syncAndValidate(testTable);
}
- }
-
- /**
- * Test rollback of various table operations sync to Metadata Table
correctly.
- */
- @ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testRollbackOperations(HoodieTableType tableType) throws
Exception {
- init(tableType);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, true))) {
- // Write 1 (Bulk insert)
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
- client.startCommitWithTime(newCommitTime);
- List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
-
- // Write 2 (inserts) + Rollback of inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateInserts(newCommitTime, 20);
- writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
-
- // Write 3 (updates) + Rollback of updates
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUniqueUpdates(newCommitTime, 20);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
- // Rollback of updates and inserts
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- records = dataGen.generateUpdates(newCommitTime, 10);
- writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
-
- // Rollback of Compaction
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
- client.compact(newCommitTime);
- validateMetadata(client);
- }
-
- // Rollback of Deletes
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- records = dataGen.generateDeletes(newCommitTime, 10);
- JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
- client.startCommitWithTime(newCommitTime);
- writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
-
- // Rollback of Clean
- newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.clean(newCommitTime);
- validateMetadata(client);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
+ // trigger an upsert
+ testTable.doWriteOperation("005", WriteOperationType.UPSERT,
Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+ if (doNotSyncFewCommits) {
+ syncAndValidate(testTable, Collections.emptyList(), true, false, true);
}
- // Rollback of partial commits
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
- getWriteConfigBuilder(false, true,
false).withRollbackUsingMarkers(false).build())) {
- // Write updates and inserts
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
- List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
+ // trigger clean
+ testTable.doClean("006", Collections.singletonList("001"));
+ if (doNotSyncFewCommits) {
+ syncAndValidate(testTable, Collections.emptyList(), true, false, false);
}
- // Marker based rollback of partial commits
- try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
- getWriteConfigBuilder(false, true,
false).withRollbackUsingMarkers(true).build())) {
- // Write updates and inserts
- String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
- List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
- assertNoWriteErrors(writeStatuses);
- client.rollback(newCommitTime);
- client.syncTableMetadata();
- validateMetadata(client);
- }
+ // trigger delete
+ testTable.doWriteOperation("007", WriteOperationType.DELETE,
Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+ syncAndValidate(testTable, Collections.emptyList(), true, true, false);
}
/**
- * Test when syncing rollback to metadata if the commit being rolled back
has not been synced that essentially a no-op occurs to metadata.
- * Once explicit sync is called, metadata should match.
+ * Tests rollback of a commit with metadata enabled.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
- public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws
Exception {
+ public void testRollbackOperations(HoodieTableType tableType) throws
Exception {
Review comment:
Added the missed operations in the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]