codope commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713737135



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -886,194 +445,112 @@ public void testCleaningArchivingAndCompaction() throws 
Exception {
   /**
    * Test various error scenarios.
    */
-  @Test
-  public void testErrorCases() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testErrorCases(HoodieTableType tableType) throws Exception {
+    init(tableType);
     // TESTCASE: If commit on the metadata table succeeds but fails on the 
dataset, then on next init the metadata table
     // should be rolled back to last valid commit.
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true), true)) {
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      // There is no way to simulate failed commit on the main dataset, hence 
we simply delete the completed
-      // instant so that only the inflight is left over.
-      String commitInstantFileName = 
HoodieTimeline.makeCommitFileName(newCommitTime);
-      assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + 
HoodieTableMetaClient.METAFOLDER_NAME,
-          commitInstantFileName), false));
-    }
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true), true)) {
-      String newCommitTime = client.startCommit();
-      // Next insert
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
-      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      // Post rollback commit and metadata should be valid
-      validateMetadata(client);
-    }
+    testTable.doWriteOperation("001", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2"),
+        Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable);
+    testTable.doWriteOperation("002", WriteOperationType.BULK_INSERT, 
Collections.emptyList(),
+        Arrays.asList("p1", "p2"), 1);
+    syncAndValidate(testTable);
+    // There is no way to simulate failed commit on the main dataset, hence we 
simply delete the completed
+    // instant so that only the inflight is left over.
+    String commitInstantFileName = HoodieTimeline.makeCommitFileName("002");
+    assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + 
HoodieTableMetaClient.METAFOLDER_NAME,
+        commitInstantFileName), false));
+    // Next upsert
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, 
Collections.emptyList(),
+        Arrays.asList("p1", "p2"), 1);
+    // Post rollback commit and metadata should be valid
+    syncTableMetadata(writeConfig);
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline();
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, "003")));
   }
 
   /**
    * Test non-partitioned datasets.
    */
-  //@Test
-  public void testNonPartitioned() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    HoodieTestDataGenerator nonPartitionedGenerator = new 
HoodieTestDataGenerator(new String[] {""});
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Write 1 (Bulk insert)
-      String newCommitTime = "001";
-      List<HoodieRecord> records = 
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      validateMetadata(client);
-
-      List<String> metadataPartitions = 
metadata(client).getAllPartitionPaths();
-      assertTrue(metadataPartitions.contains(""), "Must contain empty 
partition");
-    }
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testNonPartitioned(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    // Non-partitioned bulk insert
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, 
Collections.emptyList(), 1);
+    syncTableMetadata(writeConfig);
+    List<String> metadataPartitions = metadata(writeConfig, 
context).getAllPartitionPaths();
+    assertTrue(metadataPartitions.isEmpty(), "Must contain empty partition");
   }
 
   /**
    * Test various metrics published by metadata table.
    */
-  @Test
-  public void testMetadataMetrics() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfigBuilder(true, true, true).build())) {
-      // Write
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
-      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"));
-      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".totalDuration"));
-      
assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count") >= 1L);
-      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
-      assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
-    }
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataMetrics(HoodieTableType tableType) throws Exception {
+    init(tableType);
+    writeConfig = getWriteConfigBuilder(true, true, true).build();
+    testTable.doWriteOperation(HoodieActiveTimeline.createNewInstantTime(), 
WriteOperationType.INSERT, Arrays.asList("p1", "p2"),
+        Arrays.asList("p1", "p2"), 2, true);
+    syncTableMetadata(writeConfig);
+    Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
+    
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"));
+    
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".totalDuration"));
+    
assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count") >= 1L);
+    assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
+    assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count"));
   }
 
   /**
    * Test when reading from metadata table which is out of sync with dataset 
that results are still consistent.
    */
-  @Test
-  public void testMetadataOutOfSync() throws Exception {
-    init(HoodieTableType.COPY_ON_WRITE);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    SparkRDDWriteClient unsyncedClient = new 
SparkRDDWriteClient(engineContext, getWriteConfig(true, true));
-
-    // Enable metadata so table is initialized
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Perform Bulk Insert
-      String newCommitTime = "001";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-    }
-
-    // Perform commit operations with metadata disabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      // Perform Insert
-      String newCommitTime = "002";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      // Perform Upsert
-      newCommitTime = "003";
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
-      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "004";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-      }
-    }
-
-    assertFalse(metadata(unsyncedClient).isInSync());
-    validateMetadata(unsyncedClient);
-
-    // Perform clean operation with metadata disabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      // One more commit needed to trigger clean so upsert and compact
-      String newCommitTime = "005";
-      client.startCommitWithTime(newCommitTime);
-      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
-      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
-
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = "006";
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-      }
-
-      // Clean
-      newCommitTime = "007";
-      client.clean(newCommitTime);
-    }
-
-    assertFalse(metadata(unsyncedClient).isInSync());
-    validateMetadata(unsyncedClient);
-
-    // Perform restore with metadata disabled
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testMetadataOutOfSync(HoodieTableType tableType) throws 
Exception {
+    init(tableType);
+    testTable.doWriteOperation("001", WriteOperationType.BULK_INSERT, 
Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 1);
+    // Enable metadata so table is initialized but do not sync
+    syncAndValidate(testTable, Collections.emptyList(), true, false, false);
+    // Perform an insert and upsert
+    testTable.doWriteOperation("002", WriteOperationType.INSERT, 
Arrays.asList("p1", "p2"), 1);
+    testTable.doWriteOperation("003", WriteOperationType.UPSERT, 
Collections.singletonList("p3"), Arrays.asList("p1", "p2", "p3"), 1);
+    // Run compaction for MOR table
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("004", Arrays.asList("p1", "p2"));
+    }
+    assertFalse(metadata(writeConfig, context).isInSync());
+    testTable.doWriteOperation("005", WriteOperationType.UPSERT, 
Arrays.asList("p1", "p2", "p3"), 1);
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", Arrays.asList("p1", "p2"));
+    }
+    testTable.doClean("007", Collections.singletonList("001"));
+    /* TODO: Perform restore with metadata disabled

Review comment:
       Yeah. I was planning to take as a followup after 
https://issues.apache.org/jira/browse/HUDI-2432




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to