This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bfb26189b8 [HUDI-9424] Fixing few tests that were not running in CI  
(#13355)
0bfb26189b8 is described below

commit 0bfb26189b89335334a98ed238cc37c677a03b36
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun May 25 08:32:22 2025 -0700

    [HUDI-9424] Fixing few tests that were not running in CI  (#13355)
---
 .../SparkHoodieBackedTableMetadataWriter.java      |  5 +-
 .../hudi/testutils/HoodieClientTestBase.java       |  2 +-
 .../functional/TestConsistentBucketIndex.java      |  2 +-
 .../functional/TestHoodieBackedMetadata.java       | 59 ++++++++++++++++++----
 .../{client => }/functional/TestHoodieIndex.java   |  3 +-
 .../TestSavepointRestoreMergeOnRead.java           |  4 +-
 .../functional/TestSparkSortAndSizeClustering.java |  1 -
 .../table/functional/TestHoodieSparkRollback.java  |  2 +-
 8 files changed, 62 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 0c255551f29..3dcb60db62f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
@@ -66,6 +67,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
 
@@ -173,7 +175,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
     String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
     writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
actionType);
-    writeClient.deletePartitions(partitionsToDrop, instantTime);
+    HoodieWriteResult result = writeClient.deletePartitions(partitionsToDrop, 
instantTime);
+    writeClient.commit(instantTime, result.getWriteStatuses(), Option.empty(), 
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 7f1e48adad3..13b26f0a14c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -668,7 +668,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 
baseRecordsToUpdate);
       JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-      client.upsert(writeRecords, newCommitTime);
+      client.upsert(writeRecords, newCommitTime).collect();
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
index 474db9d7ed2..f52fc121b68 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
 
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
similarity index 98%
rename from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 7e49ba7ef3f..7ec558ae858 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -28,6 +28,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -1759,6 +1760,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<WriteStatus> writeStatuses =
           client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // Metadata table should exist
@@ -1806,6 +1808,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
 
       // Metadata table is recreated, during bootstrapping of metadata table.
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -1841,6 +1844,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, commitTime);
       List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), commitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(commitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // Rollback the first commit
@@ -1852,6 +1856,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, commitTime);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
commitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(commitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
     }
   }
@@ -1950,6 +1955,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, firstCommit);
       List<WriteStatus> writeStatuses = 
client.insert(jsc.parallelize(processedRecords, 1), firstCommit).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommit, jsc.parallelize(writeStatuses));
 
       // Write 2 (inserts)
       String secondCommit = "0000002";
@@ -1961,6 +1967,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
           .collect(Collectors.toList());
       writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), 
secondCommit).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommit, jsc.parallelize(writeStatuses));
 
       Map<String, Map<String, List<String>>> commitToPartitionsToFiles = new 
HashMap<>();
       // populate commit -> partition -> file info to assist in validation and 
prefix search
@@ -2052,6 +2059,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<WriteStatus> writeStatuses =
         client.bulkInsert(jsc.parallelize(records, 1), commit1).collect();
     assertNoWriteErrors(writeStatuses);
+    client.commit(commit1, jsc.parallelize(writeStatuses));
 
     // Write 2 (inserts)
     String commit2 = client.createNewInstantTime();
@@ -2059,6 +2067,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     records = dataGen.generateInserts(commit2, 20);
     writeStatuses = client.insert(jsc.parallelize(records, 1), 
commit2).collect();
     assertNoWriteErrors(writeStatuses);
+    client.commit(commit2, jsc.parallelize(writeStatuses));
+
     // remove latest completed delta commit from MDT.
     StoragePath toDelete = HoodieTestUtils.getCompleteInstantPath(
         metaClient.getStorage(),
@@ -2074,6 +2084,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     records = dataGen.generateUniqueUpdates(commit3, 10);
     writeStatuses = client.upsert(jsc.parallelize(records, 1), 
commit3).collect();
     assertNoWriteErrors(writeStatuses);
+    client.commit(commit3, jsc.parallelize(writeStatuses));
 
     // ensure that 000003 is after rollback of the partially failed 2nd commit.
     HoodieTableMetaClient metadataMetaClient = 
HoodieTestUtils.createMetaClient(
@@ -2120,6 +2131,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // Write 2 (inserts)
@@ -2130,6 +2142,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateInserts(newCommitTime, 20);
       writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // Write 3 (updates)
@@ -2138,6 +2151,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUniqueUpdates(newCommitTime, 10);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
 
       // Write 4 (updates and inserts)
       newCommitTime = client.createNewInstantTime();
@@ -2145,6 +2159,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUpdates(newCommitTime, 10);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // Compaction
@@ -2164,6 +2179,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUpdates(newCommitTime, 5);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
     }
@@ -2186,6 +2202,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUpdates(newCommitTime, 5);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
 
       // Clean
       newCommitTime = client.createNewInstantTime();
@@ -2225,6 +2242,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     WriteClientTestUtils.startCommitWithTime(writeClient, initialCommit);
     List<WriteStatus> initialWriteStatuses = 
writeClient.insert(jsc.parallelize(initialRecords, 1), initialCommit).collect();
     assertNoWriteErrors(initialWriteStatuses);
+    writeClient.commit(initialCommit, jsc.parallelize(initialWriteStatuses));
     writeClient.close();
 
     ExecutorService executors = 
Executors.newFixedThreadPool(dataGen.getPartitionPaths().length);
@@ -2245,6 +2263,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         WriteClientTestUtils.startCommitWithTime(writeClient, newCommitTime);
         List<WriteStatus> writeStatuses = 
localWriteClient.insert(jsc.parallelize(records, 1), newCommitTime).collect();
         assertNoWriteErrors(writeStatuses);
+        localWriteClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
       });
       futures.add(future);
     }
@@ -2471,6 +2490,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, commitTimestamps[0]);
       writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
commitTimestamps[0]).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(commitTimestamps[0], jsc.parallelize(writeStatuses));
 
       // make all commits to inflight in metadata table. Still read should go 
through, just that it may not return any data.
       FileCreateUtilsLegacy.deleteDeltaCommit(basePath + "/.hoodie/metadata/", 
commitTimestamps[0]);
@@ -2500,6 +2520,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         WriteClientTestUtils.startCommitWithTime(client, commitTimestamps[i]);
         writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
commitTimestamps[i]).collect();
         assertNoWriteErrors(writeStatuses);
+        client.commit(commitTimestamps[i], jsc.parallelize(writeStatuses));
       }
 
       // Ensure we can see files from each commit
@@ -2813,6 +2834,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
       List<WriteStatus> writeStatuses = 
client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       newCommitTime = client.createNewInstantTime();
@@ -2820,6 +2842,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateInserts(newCommitTime, 20);
       writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // There is no way to simulate failed commit on the main dataset, hence 
we simply delete the completed
@@ -2838,6 +2861,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
       List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
     }
   }
@@ -2926,6 +2950,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<WriteStatus> writeStatuses =
           client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
     }
 
@@ -2946,6 +2971,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<WriteStatus> writeStatuses =
           client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
     }
 
     // Metadata table is recreated, during bootstrapping of metadata table.
@@ -3040,6 +3066,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
       List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       newCommitTime = client.createNewInstantTime();
@@ -3047,6 +3074,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateInserts(newCommitTime, 5);
       writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       // There is no way to simulate failed commit on the main dataset, hence 
we simply delete the completed
@@ -3065,6 +3093,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
       List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
 
       // Post rollback commit and metadata should be valid
       validateMetadata(client);
@@ -3137,6 +3166,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<HoodieRecord> records = 
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       List<String> metadataPartitions = metadata(client, 
storage).getAllPartitionPaths();
@@ -3158,6 +3188,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<HoodieRecord> records = 
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       List<String> metadataPartitions = metadata(client, 
storage).getAllPartitionPaths();
@@ -3245,6 +3276,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
       List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       validateMetadata(client);
 
       Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), 
storage);
@@ -3291,6 +3323,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       List<WriteStatus> writeStatuses =
           client.insert(jsc.parallelize(recordsFirstBatch, 1), 
firstCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
       commitTimestamps.add(firstCommitTime);
     }
     assertEquals(false,
@@ -3311,7 +3344,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       String secondCommitTime = client.createNewInstantTime();
       List<HoodieRecord> recordsSecondBatch = 
dataGen.generateInserts(secondCommitTime, 100);
       WriteClientTestUtils.startCommitWithTime(client, secondCommitTime);
-      assertThrows(HoodieException.class, () -> 
client.insert(jsc.parallelize(recordsSecondBatch, 1), secondCommitTime));
+      assertThrows(HoodieException.class, () -> 
client.insert(jsc.parallelize(recordsSecondBatch, 1), 
secondCommitTime).collect());
     }
   }
 
@@ -3340,7 +3373,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         List<HoodieRecord> records = index == 0 ? 
dataGen.generateInsertsForPartition(newCommitTime, 10, partition)
             : dataGen.generateUniqueUpdates(newCommitTime, 5);
         WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
-        client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+        List<WriteStatus> writeStatuses = 
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+        client.commit(newCommitTime, jsc.parallelize(writeStatuses));
       }
     }
     
assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
 3);
@@ -3551,7 +3585,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       
assertTrue(metadataWriteClient.scheduleCompactionAtInstant(compactionInstantTime,
 Option.empty()));
       HoodieWriteMetadata result = 
metadataWriteClient.compact(compactionInstantTime);
       metadataWriteClient.commitCompaction(compactionInstantTime, result, 
Option.empty());
-      
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime));
+      // validate that compaction has been complete in metadata table.
+      assertTrue(((SparkHoodieBackedTableMetadataWriter) 
metadataWriter).getTableMetadata().getMetadataMetaClient().reloadActiveTimeline().filterCompletedInstants()
+          .containsInstant(compactionInstantTime));
 
       // verify metadata table
       validateMetadata(client);
@@ -3585,7 +3621,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // First commit
       List<HoodieRecord> firstBatchOfrecords = 
dataGen.generateInserts(firstCommitTime, 10);
       WriteClientTestUtils.startCommitWithTime(client, firstCommitTime);
-      client.insert(jsc.parallelize(firstBatchOfrecords, 1), 
firstCommitTime).collect();
+      List<WriteStatus> writeStatuses = 
client.insert(jsc.parallelize(firstBatchOfrecords, 1), 
firstCommitTime).collect();
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
 
       // Records got inserted and RI is initialized
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -3597,7 +3634,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       secondCommitTime = client.createNewInstantTime();
       List<HoodieRecord> secondBatchOfrecords = 
dataGen.generateInserts(secondCommitTime, 5);
       WriteClientTestUtils.startCommitWithTime(client, secondCommitTime);
-      client.bulkInsert(jsc.parallelize(secondBatchOfrecords, 1), 
secondCommitTime).collect();
+      writeStatuses = client.bulkInsert(jsc.parallelize(secondBatchOfrecords, 
1), secondCommitTime).collect();
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
 
       assertEquals(secondBatchOfrecords.size(),
           HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), 
engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), 
secondCommitTime, true, INSTANT_GENERATOR).count());
@@ -3619,7 +3657,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       String deleteTime = client.createNewInstantTime();
       WriteClientTestUtils.startCommitWithTime(client, deleteTime);
-      client.delete(jsc.parallelize(recordsToDelete, 
1).map(HoodieRecord::getKey), deleteTime);
+      writeStatuses = client.delete(jsc.parallelize(recordsToDelete, 
1).map(HoodieRecord::getKey), deleteTime).collect();
+      client.commit(deleteTime, jsc.parallelize(writeStatuses));
 
       // RI should not return mappings for deleted records
       metadataReader = HoodieTableMetadata.create(
@@ -3634,7 +3673,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
       // An empty delete to trigger compaction
       String deleteTime = client.startCommit();
-      client.delete(jsc.emptyRDD(), deleteTime);
+      List<WriteStatus> writeStatuses = client.delete(jsc.emptyRDD(), 
deleteTime).collect();
+      client.commit(deleteTime, jsc.parallelize(writeStatuses));
 
       HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
           context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
@@ -3648,7 +3688,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // Adding records with the same keys after delete should work
       String reinsertTime = client.startCommit();
-      client.upsert(jsc.parallelize(recordsToDelete, 1), 
reinsertTime).collect();
+      writeStatuses = client.upsert(jsc.parallelize(recordsToDelete, 1), 
reinsertTime).collect();
+      client.commit(reinsertTime, jsc.parallelize(writeStatuses));
 
       // New mappings should have been created for re-inserted records and 
should map to the new commit time
       metadataReader = HoodieTableMetadata.create(context, storage, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 9d5042fb51c..68e7909265f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
 
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
index ec1f94526ab..365087cca9c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -316,6 +317,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
       updateBatchWithoutCommit(client.createNewInstantTime(),
           Objects.requireNonNull(baseRecordsToUpdate, "The records to update 
should not be null"));
       // rollback the delta_commit
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       assertTrue(client.rollbackFailedWrites(metaClient), "The last 
delta_commit should be rolled back");
 
       // another update
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index f7bd900cd41..8647fbcbb11 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -63,7 +63,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestSparkSortAndSizeClustering extends 
HoodieSparkClientTestHarness {
 
-
   private HoodieWriteConfig config;
   private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
index f774550b8ef..1f4416ea354 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
@@ -24,7 +24,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.functional.TestHoodieBackedMetadata;
+import org.apache.hudi.functional.TestHoodieBackedMetadata;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;

Reply via email to