nsivabalan commented on a change in pull request #3695:
URL: https://github.com/apache/hudi/pull/3695#discussion_r713842735



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
##########
@@ -418,4 +445,177 @@ public HoodieTableFileSystemView 
getHoodieTableFileSystemView(HoodieTableMetaCli
     }
     return Pair.of(partitionPathStatMap, globalStat);
   }
+
+  /**
+   * Validate the metadata tables contents to ensure it matches what is on the 
file system.
+   */
+  public void validateMetadata(HoodieTestTable testTable, List<String> 
inflightCommits, HoodieWriteConfig writeConfig,
+                               String metadataTableBasePath, boolean 
doFullValidation) throws IOException {
+    HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
+    assertNotNull(tableMetadata, "MetadataReader should have been 
initialized");
+    if (!writeConfig.isMetadataTableEnabled() || 
!writeConfig.getMetadataConfig().validateFileListingMetadata()) {
+      return;
+    }
+
+    assertEquals(inflightCommits, testTable.inflightCommits());
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Partitions should match
+    List<java.nio.file.Path> fsPartitionPaths = 
testTable.getAllPartitionPaths();
+    List<String> fsPartitions = new ArrayList<>();
+    fsPartitionPaths.forEach(entry -> 
fsPartitions.add(entry.getFileName().toString()));
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+
+    Collections.sort(fsPartitions);
+    Collections.sort(metadataPartitions);
+
+    assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions 
should match");
+    assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
+
+    // Files within each partition should match
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
+    TableFileSystemView tableView = table.getHoodieView();
+    List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> 
basePath + "/" + partition).collect(Collectors.toList());
+    Map<String, FileStatus[]> partitionToFilesMap = 
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
+    assertEquals(fsPartitions.size(), partitionToFilesMap.size());
+
+    fsPartitions.forEach(partition -> {
+      try {
+        validateFilesPerPartition(testTable, tableMetadata, tableView, 
partitionToFilesMap, partition);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();

Review comment:
       lets remove e.printStackTrace();

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -612,214 +296,120 @@ public void testRollbackUnsyncedCommit(HoodieTableType 
tableType) throws Excepti
   @EnumSource(HoodieTableType.class)
   public void testManualRollbacks(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    doWriteOperationsAndBootstrapMetadata(testTable);
 
     // Setting to archive more aggressively on the Metadata Table than the 
Dataset
     final int maxDeltaCommitsBeforeCompaction = 4;
     final int minArchiveCommitsMetadata = 2;
     final int minArchiveCommitsDataset = 4;
-    HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+    writeConfig = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
             .archiveCommitsWith(minArchiveCommitsMetadata, 
minArchiveCommitsMetadata + 1).retainCommits(1)
             
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset,
 minArchiveCommitsDataset + 1)
             
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
         .build();
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
config)) {
-      // Initialize table with metadata
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Perform multiple commits
-      for (int i = 1; i < 10; ++i) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        if (i == 1) {
-          records = dataGen.generateInserts(newCommitTime, 5);
-        } else {
-          records = dataGen.generateUpdates(newCommitTime, 2);
-        }
-        client.startCommitWithTime(newCommitTime);
-        writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-        assertNoWriteErrors(writeStatuses);
+    for (int i = 3; i < 10; i++) {
+      if (i == 3) {
+        testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), 
asList("p1", "p2", "p3"), 2);
+        syncTableMetadata(writeConfig);
+      } else {
+        testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", 
"p2", "p3"), 2);
       }
+    }
+    syncAndValidate(testTable, true);
 
-      // We can only rollback those commits whose deltacommit have not been 
archived yet.
-      int numRollbacks = 0;
-      boolean exceptionRaised = false;
-
-      List<HoodieInstant> allInstants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
-          .collect(Collectors.toList());
-      for (HoodieInstant instantToRollback : allInstants) {
-        try {
-          client.rollback(instantToRollback.getTimestamp());
-          client.syncTableMetadata();
-          ++numRollbacks;
-        } catch (HoodieMetadataException e) {
-          exceptionRaised = true;
-          break;
-        }
-      }
+    // We can only rollback those commits whose deltacommit have not been 
archived yet.
+    int numRollbacks = 0;
+    boolean exceptionRaised = false;
 
-      assertTrue(exceptionRaised, "Rollback of archived instants should fail");
-      // Since each rollback also creates a deltacommit, we can only support 
rolling back of half of the original
-      // instants present before rollback started.
-      assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, 
minArchiveCommitsMetadata) / 2,
-          "Rollbacks of non archived instants should work");
+    List<HoodieInstant> allInstants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList());
+    for (HoodieInstant instantToRollback : allInstants) {
+      try {
+        testTable.doRollback(instantToRollback.getTimestamp(), 
String.valueOf(Time.now()));
+        syncTableMetadata(writeConfig);
+        ++numRollbacks;
+      } catch (HoodieMetadataException e) {
+        exceptionRaised = true;
+        break;
+      }
     }
+
+    assertTrue(exceptionRaised, "Rollback of archived instants should fail");
+    // Since each rollback also creates a deltacommit, we can only support 
rolling back of half of the original
+    // instants present before rollback started.
+    assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, 
minArchiveCommitsMetadata) / 2,
+        "Rollbacks of non archived instants should work");
   }
 
   /**
    * Test sync of table operations.
    */
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
-  @Disabled
   public void testSync(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    String newCommitTime;
-    List<HoodieRecord> records;
-    List<WriteStatus> writeStatuses;
-
     // Initial commits without metadata table enabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
-
+    writeConfig = getWriteConfigBuilder(true, false, false).build();
+    testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), 
asList("p1", "p2"), 1);
+    testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1);
     // Enable metadata table so it initialized by listing from file system
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      validateMetadata(client);
-      assertTrue(metadata(client).isInSync());
-    }
-
+    testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1);
+    syncAndValidate(testTable, emptyList(), true, true, true);
     // Various table operations without metadata table enabled
-    String restoreToInstant;
-    String inflightActionTimestamp;
-    String beforeInflightActionTimestamp;
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Savepoint
-      restoreToInstant = newCommitTime;
-      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        client.savepoint("hoodie", "metadata test");
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Record a timestamp for creating an inflight instance for sync testing
-      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
-      beforeInflightActionTimestamp = newCommitTime;
-
-      // Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 5);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1);
+    testTable.doWriteOperation("005", UPSERT, singletonList("p3"), 
asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // insert overwrite to test replacecommit
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      HoodieWriteResult replaceResult = 
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
-      writeStatuses = replaceResult.getWriteStatuses().collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
 
-    // If there is an incomplete operation, the Metadata Table is not updated 
beyond that operations but the
-    // in-memory merge should consider all the completed operations.
-    Path inflightCleanPath = new Path(metaClient.getMetaPath(), 
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
-    fs.create(inflightCleanPath).close();
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Restore cannot be done until the metadata table is in sync. See 
HUDI-1502 for details
-      client.syncTableMetadata();
+    // trigger an upsert
+    testTable.doWriteOperation("007", UPSERT, asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, false, true);
 
-      // Table should sync only before the inflightActionTimestamp
-      HoodieBackedTableMetadataWriter writer =
-          (HoodieBackedTableMetadataWriter) 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), 
context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
beforeInflightActionTimestamp);
+    // savepoint
+    if (COPY_ON_WRITE.equals(tableType)) {
+      testTable.doSavepoint("007");
+      syncAndValidate(testTable);
+    }
 
-      // Reader should sync to all the completed instants
-      HoodieTableMetadata metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), 
newCommitTime);
+    // trigger delete
+    testTable.doWriteOperation("008", DELETE, emptyList(), asList("p1", "p2", 
"p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, true, false);
 
-      // Remove the inflight instance holding back table sync
-      fs.delete(inflightCleanPath, false);
-      client.syncTableMetadata();
+    // trigger clean
+    testTable.doCleanBasedOnCommits("009", asList("001", "002"));
+    syncAndValidate(testTable, emptyList(), true, false, false);
 
-      writer =
-          
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
 client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
+    // trigger another upsert
+    testTable.doWriteOperation("010", UPSERT, asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, false, false);
 
-      // Reader should sync to all the completed instants
-      metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
-    }
+    // trigger clustering
+    testTable.doCluster("011", new HashMap<>());
+    syncAndValidate(testTable, emptyList(), true, true, false);
 
-    // Enable metadata table and ensure it is synced
+    // If there is an inflight operation, the Metadata Table is not updated 
beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("007", UPSERT, emptyList(),
+        asList("p1", "p2", "p3"), 2, false, true);
+    // trigger upsert
+    testTable.doWriteOperation("013", UPSERT, emptyList(), asList("p1", "p2", 
"p3"), 2);
+    // testTable validation will fetch only files pertaining to completed 
commits. So, validateMetadata() will skip files for 006

Review comment:
       fix comments wrt actual commit time. is it 006 or 007.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
##########
@@ -612,214 +296,120 @@ public void testRollbackUnsyncedCommit(HoodieTableType 
tableType) throws Excepti
   @EnumSource(HoodieTableType.class)
   public void testManualRollbacks(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    doWriteOperationsAndBootstrapMetadata(testTable);
 
     // Setting to archive more aggressively on the Metadata Table than the 
Dataset
     final int maxDeltaCommitsBeforeCompaction = 4;
     final int minArchiveCommitsMetadata = 2;
     final int minArchiveCommitsDataset = 4;
-    HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+    writeConfig = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
             .archiveCommitsWith(minArchiveCommitsMetadata, 
minArchiveCommitsMetadata + 1).retainCommits(1)
             
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset,
 minArchiveCommitsDataset + 1)
             
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
         .build();
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
config)) {
-      // Initialize table with metadata
-      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
-      client.startCommitWithTime(newCommitTime);
-      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
-
-      // Perform multiple commits
-      for (int i = 1; i < 10; ++i) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        if (i == 1) {
-          records = dataGen.generateInserts(newCommitTime, 5);
-        } else {
-          records = dataGen.generateUpdates(newCommitTime, 2);
-        }
-        client.startCommitWithTime(newCommitTime);
-        writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-        assertNoWriteErrors(writeStatuses);
+    for (int i = 3; i < 10; i++) {
+      if (i == 3) {
+        testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), 
asList("p1", "p2", "p3"), 2);
+        syncTableMetadata(writeConfig);
+      } else {
+        testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", 
"p2", "p3"), 2);
       }
+    }
+    syncAndValidate(testTable, true);
 
-      // We can only rollback those commits whose deltacommit have not been 
archived yet.
-      int numRollbacks = 0;
-      boolean exceptionRaised = false;
-
-      List<HoodieInstant> allInstants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
-          .collect(Collectors.toList());
-      for (HoodieInstant instantToRollback : allInstants) {
-        try {
-          client.rollback(instantToRollback.getTimestamp());
-          client.syncTableMetadata();
-          ++numRollbacks;
-        } catch (HoodieMetadataException e) {
-          exceptionRaised = true;
-          break;
-        }
-      }
+    // We can only rollback those commits whose deltacommit have not been 
archived yet.
+    int numRollbacks = 0;
+    boolean exceptionRaised = false;
 
-      assertTrue(exceptionRaised, "Rollback of archived instants should fail");
-      // Since each rollback also creates a deltacommit, we can only support 
rolling back of half of the original
-      // instants present before rollback started.
-      assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, 
minArchiveCommitsMetadata) / 2,
-          "Rollbacks of non archived instants should work");
+    List<HoodieInstant> allInstants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList());
+    for (HoodieInstant instantToRollback : allInstants) {
+      try {
+        testTable.doRollback(instantToRollback.getTimestamp(), 
String.valueOf(Time.now()));
+        syncTableMetadata(writeConfig);
+        ++numRollbacks;
+      } catch (HoodieMetadataException e) {
+        exceptionRaised = true;
+        break;
+      }
     }
+
+    assertTrue(exceptionRaised, "Rollback of archived instants should fail");
+    // Since each rollback also creates a deltacommit, we can only support 
rolling back of half of the original
+    // instants present before rollback started.
+    assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, 
minArchiveCommitsMetadata) / 2,
+        "Rollbacks of non archived instants should work");
   }
 
   /**
    * Test sync of table operations.
    */
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
-  @Disabled
   public void testSync(HoodieTableType tableType) throws Exception {
     init(tableType);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-
-    String newCommitTime;
-    List<HoodieRecord> records;
-    List<WriteStatus> writeStatuses;
-
     // Initial commits without metadata table enabled
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateInserts(newCommitTime, 5);
-      client.startCommitWithTime(newCommitTime);
-      writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-    }
-
+    writeConfig = getWriteConfigBuilder(true, false, false).build();
+    testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), 
asList("p1", "p2"), 1);
+    testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1);
     // Enable metadata table so it initialized by listing from file system
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-
-      validateMetadata(client);
-      assertTrue(metadata(client).isInSync());
-    }
-
+    testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1);
+    syncAndValidate(testTable, emptyList(), true, true, true);
     // Various table operations without metadata table enabled
-    String restoreToInstant;
-    String inflightActionTimestamp;
-    String beforeInflightActionTimestamp;
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 5);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // updates and inserts
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
-
-      // Compaction
-      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
-        newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
-        client.compact(newCommitTime);
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Savepoint
-      restoreToInstant = newCommitTime;
-      if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
-        client.savepoint("hoodie", "metadata test");
-        assertTrue(metadata(client).isInSync());
-      }
-
-      // Record a timestamp for creating an inflight instance for sync testing
-      inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
-      beforeInflightActionTimestamp = newCommitTime;
-
-      // Deletes
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      records = dataGen.generateDeletes(newCommitTime, 5);
-      JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
-      client.startCommitWithTime(newCommitTime);
-      client.delete(deleteKeys, newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // Clean
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.clean(newCommitTime);
-      assertTrue(metadata(client).isInSync());
-
-      // updates
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime);
-      records = dataGen.generateUniqueUpdates(newCommitTime, 10);
-      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1);
+    testTable.doWriteOperation("005", UPSERT, singletonList("p3"), 
asList("p1", "p2", "p3"), 3);
+    syncAndValidate(testTable);
 
-      // insert overwrite to test replacecommit
-      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
-      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      records = dataGen.generateInserts(newCommitTime, 5);
-      HoodieWriteResult replaceResult = 
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
-      writeStatuses = replaceResult.getWriteStatuses().collect();
-      assertNoWriteErrors(writeStatuses);
-      assertTrue(metadata(client).isInSync());
+    // trigger compaction
+    if (MERGE_ON_READ.equals(tableType)) {
+      testTable = testTable.doCompaction("006", asList("p1", "p2"));
+      syncAndValidate(testTable);
     }
 
-    // If there is an incomplete operation, the Metadata Table is not updated 
beyond that operations but the
-    // in-memory merge should consider all the completed operations.
-    Path inflightCleanPath = new Path(metaClient.getMetaPath(), 
HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
-    fs.create(inflightCleanPath).close();
-
-    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
-      // Restore cannot be done until the metadata table is in sync. See 
HUDI-1502 for details
-      client.syncTableMetadata();
+    // trigger an upsert
+    testTable.doWriteOperation("007", UPSERT, asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, false, true);
 
-      // Table should sync only before the inflightActionTimestamp
-      HoodieBackedTableMetadataWriter writer =
-          (HoodieBackedTableMetadataWriter) 
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), 
context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
beforeInflightActionTimestamp);
+    // savepoint
+    if (COPY_ON_WRITE.equals(tableType)) {
+      testTable.doSavepoint("007");
+      syncAndValidate(testTable);
+    }
 
-      // Reader should sync to all the completed instants
-      HoodieTableMetadata metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), 
newCommitTime);
+    // trigger delete
+    testTable.doWriteOperation("008", DELETE, emptyList(), asList("p1", "p2", 
"p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, true, false);
 
-      // Remove the inflight instance holding back table sync
-      fs.delete(inflightCleanPath, false);
-      client.syncTableMetadata();
+    // trigger clean
+    testTable.doCleanBasedOnCommits("009", asList("001", "002"));
+    syncAndValidate(testTable, emptyList(), true, false, false);
 
-      writer =
-          
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf,
 client.getConfig(), context);
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
+    // trigger another upsert
+    testTable.doWriteOperation("010", UPSERT, asList("p1", "p2", "p3"), 2);
+    syncAndValidate(testTable, emptyList(), true, false, false);
 
-      // Reader should sync to all the completed instants
-      metadata = HoodieTableMetadata.create(context, 
client.getConfig().getMetadataConfig(),
-          client.getConfig().getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
-      assertEquals(writer.getMetadataReader().getUpdateTime().get(), 
newCommitTime);
-    }
+    // trigger clustering
+    testTable.doCluster("011", new HashMap<>());
+    syncAndValidate(testTable, emptyList(), true, true, false);
 
-    // Enable metadata table and ensure it is synced
+    // If there is an inflight operation, the Metadata Table is not updated 
beyond that operations but the
+    // in-memory merge should consider all the completed operations.
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("007", UPSERT, emptyList(),

Review comment:
       guess we should avoid 007 at line 271. make that 008 and adjust 
subsequent commit times as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to