codope commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713737777



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -264,344 +156,109 @@ public void testOnlyValidPartitionsAdded() throws 
Exception {
     final String filteredDirectoryThree = ".backups";
 
     // Create some commits
-    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, 
filteredDirectoryTwo, filteredDirectoryThree)
         .addCommit("001").withBaseFilesInPartition("p1", 
10).withBaseFilesInPartition("p2", 10, 10)
         .addCommit("002").withBaseFilesInPartition("p1", 
10).withBaseFilesInPartition("p2", 10, 10, 10);
 
-    final HoodieWriteConfig writeConfig =
-            getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, 
true, true, false)
+    writeConfig = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
-      client.startCommitWithTime("005");
-      client.insert(jsc.emptyRDD(), "005");
-
-      List<String> partitions = 
metadataWriter(client).metadata().getAllPartitionPaths();
-      assertFalse(partitions.contains(nonPartitionDirectory),
-          "Must not contain the non-partition " + nonPartitionDirectory);
-      assertTrue(partitions.contains("p1"), "Must contain partition p1");
-      assertTrue(partitions.contains("p2"), "Must contain partition p2");
-
-      assertFalse(partitions.contains(filteredDirectoryOne),
-          "Must not contain the filtered directory " + filteredDirectoryOne);
-      assertFalse(partitions.contains(filteredDirectoryTwo),
-          "Must not contain the filtered directory " + filteredDirectoryTwo);
-      assertFalse(partitions.contains(filteredDirectoryThree),
-          "Must not contain the filtered directory " + filteredDirectoryThree);
-
-      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new 
Path(basePath, "p1"));
-      assertEquals(2, statuses.length);
-      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, 
"p2"));
-      assertEquals(5, statuses.length);
-      Map<String, FileStatus[]> partitionsToFilesMap = 
metadata(client).getAllFilesInPartitions(
-          Arrays.asList(basePath + "/p1", basePath + "/p2"));
-      assertEquals(2, partitionsToFilesMap.size());
-      assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length);
-      assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length);
-    }
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, 
Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
+    syncTableMetadata(writeConfig);
+
+    List<String> partitions = 
metadataWriter(writeConfig).metadata().getAllPartitionPaths();
+    assertFalse(partitions.contains(nonPartitionDirectory),
+        "Must not contain the non-partition " + nonPartitionDirectory);
+    assertTrue(partitions.contains("p1"), "Must contain partition p1");
+    assertTrue(partitions.contains("p2"), "Must contain partition p2");
+
+    assertFalse(partitions.contains(filteredDirectoryOne),
+        "Must not contain the filtered directory " + filteredDirectoryOne);
+    assertFalse(partitions.contains(filteredDirectoryTwo),
+        "Must not contain the filtered directory " + filteredDirectoryTwo);
+    assertFalse(partitions.contains(filteredDirectoryThree),
+        "Must not contain the filtered directory " + filteredDirectoryThree);
+
+    FileStatus[] statuses = metadata(writeConfig, 
context).getAllFilesInPartition(new Path(basePath, "p1"));
+    assertEquals(3, statuses.length);
+    statuses = metadata(writeConfig, context).getAllFilesInPartition(new 
Path(basePath, "p2"));
+    assertEquals(6, statuses.length);
+    Map<String, FileStatus[]> partitionsToFilesMap = metadata(writeConfig, 
context).getAllFilesInPartitions(
+        Arrays.asList(basePath + "/p1", basePath + "/p2"));
+    assertEquals(2, partitionsToFilesMap.size());
+    assertEquals(3, partitionsToFilesMap.get(basePath + "/p1").length);
+    assertEquals(6, partitionsToFilesMap.get(basePath + "/p2").length);
   }
 
   /**
    * Test various table operations sync to Metadata Table correctly.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperations(HoodieTableType tableType) throws Exception {
+  @MethodSource("bootstrapAndTableOperationTestArgs")
+  public void testTableOperations(HoodieTableType tableType, boolean 
doNotSyncFewCommits) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // bootstrap w/ 2 commits
+    bootstrapMetadata(testTable);
 
-      // Write 2 (inserts)
-      newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      validateMetadata(client);
-
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 3 (updates)
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 4 (updates and inserts)
-      newCommitTime = "004";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, 
Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "005";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Write 5 (updates and inserts)
-      newCommitTime = "006";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "007";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Deletes
-      newCommitTime = "008";
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      validateMetadata(client);
-
-      // Clean
-      newCommitTime = "009";
-      client.clean(newCommitTime);
-      validateMetadata(client);
-
-      // Restore
-      client.restoreToInstant("006");
-      validateMetadata(client);
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
-  }
-
-  /**
-   * Test rollback of various table operations sync to Metadata Table 
correctly.
-   */
-  @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testRollbackOperations(HoodieTableType tableType) throws 
Exception {
-    init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Write 2 (inserts) + Rollback of inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 20);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Write 3 (updates) + Rollback of updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
 
-      // Rollback of updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        validateMetadata(client);
-      }
-
-      // Rollback of Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 10);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.delete(deleteKeys, newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-
-      // Rollback of Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      validateMetadata(client);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger an upsert
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, 
Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, true);
     }
 
-    // Rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, 
false).withRollbackUsingMarkers(false).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
+    // trigger clean
+    testTable.doClean("006", Collections.singletonList("001"));
+    if (doNotSyncFewCommits) {
+      syncAndValidate(testTable, Collections.emptyList(), true, false, false);
     }
 
-    // Marker based rollback of partial commits
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
-        getWriteConfigBuilder(false, true, 
false).withRollbackUsingMarkers(true).build())) {
-      // Write updates and inserts
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      client.rollback(newCommitTime);
-      client.syncTableMetadata();
-      validateMetadata(client);
-    }
+    // trigger delete
+    testTable.doWriteOperation("007", WriteOperationType.DELETE, 
Collections.emptyList(), Arrays.asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, Collections.emptyList(), true, true, false);
   }
 
   /**
-   * Test when syncing rollback to metadata if the commit being rolled back 
has not been synced that essentially a no-op occurs to metadata.
-   * Once explicit sync is called, metadata should match.
+   * Tests rollback of a commit with metadata enabled.
    */
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
-  public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws 
Exception {
+  public void testRollbackOperations(HoodieTableType tableType) throws 
Exception {

Review comment:
       Added the missed operations in the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to