This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0bfb26189b8 [HUDI-9424] Fixing few tests that were not running in CI
(#13355)
0bfb26189b8 is described below
commit 0bfb26189b89335334a98ed238cc37c677a03b36
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun May 25 08:32:22 2025 -0700
[HUDI-9424] Fixing few tests that were not running in CI (#13355)
---
.../SparkHoodieBackedTableMetadataWriter.java | 5 +-
.../hudi/testutils/HoodieClientTestBase.java | 2 +-
.../functional/TestConsistentBucketIndex.java | 2 +-
.../functional/TestHoodieBackedMetadata.java | 59 ++++++++++++++++++----
.../{client => }/functional/TestHoodieIndex.java | 3 +-
.../TestSavepointRestoreMergeOnRead.java | 4 +-
.../functional/TestSparkSortAndSizeClustering.java | 1 -
.../table/functional/TestHoodieSparkRollback.java | 2 +-
8 files changed, 62 insertions(+), 16 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 0c255551f29..3dcb60db62f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
@@ -66,6 +67,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
@@ -173,7 +175,8 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
actionType);
- writeClient.deletePartitions(partitionsToDrop, instantTime);
+ HoodieWriteResult result = writeClient.deletePartitions(partitionsToDrop,
instantTime);
+ writeClient.commit(instantTime, result.getWriteStatuses(), Option.empty(),
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 7f1e48adad3..13b26f0a14c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -668,7 +668,7 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime,
baseRecordsToUpdate);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- client.upsert(writeRecords, newCommitTime);
+ client.upsert(writeRecords, newCommitTime).collect();
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
index 474db9d7ed2..f52fc121b68 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
similarity index 98%
rename from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 7e49ba7ef3f..7ec558ae858 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -28,6 +28,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -1759,6 +1760,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// Metadata table should exist
@@ -1806,6 +1808,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
// Metadata table is recreated, during bootstrapping of metadata table.
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -1841,6 +1844,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, commitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
1), commitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// Rollback the first commit
@@ -1852,6 +1856,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, commitTime);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
commitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
}
}
@@ -1950,6 +1955,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, firstCommit);
List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(processedRecords, 1), firstCommit).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(firstCommit, jsc.parallelize(writeStatuses));
// Write 2 (inserts)
String secondCommit = "0000002";
@@ -1961,6 +1967,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.collect(Collectors.toList());
writeStatuses = client.insert(jsc.parallelize(processedRecords, 1),
secondCommit).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(secondCommit, jsc.parallelize(writeStatuses));
Map<String, Map<String, List<String>>> commitToPartitionsToFiles = new
HashMap<>();
// populate commit -> partition -> file info to assist in validation and
prefix search
@@ -2052,6 +2059,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), commit1).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commit1, jsc.parallelize(writeStatuses));
// Write 2 (inserts)
String commit2 = client.createNewInstantTime();
@@ -2059,6 +2067,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateInserts(commit2, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1),
commit2).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commit2, jsc.parallelize(writeStatuses));
+
// remove latest completed delta commit from MDT.
StoragePath toDelete = HoodieTestUtils.getCompleteInstantPath(
metaClient.getStorage(),
@@ -2074,6 +2084,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUniqueUpdates(commit3, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
commit3).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commit3, jsc.parallelize(writeStatuses));
// ensure that 000003 is after rollback of the partially failed 2nd commit.
HoodieTableMetaClient metadataMetaClient =
HoodieTestUtils.createMetaClient(
@@ -2120,6 +2131,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// Write 2 (inserts)
@@ -2130,6 +2142,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// Write 3 (updates)
@@ -2138,6 +2151,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
// Write 4 (updates and inserts)
newCommitTime = client.createNewInstantTime();
@@ -2145,6 +2159,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// Compaction
@@ -2164,6 +2179,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
}
@@ -2186,6 +2202,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
// Clean
newCommitTime = client.createNewInstantTime();
@@ -2225,6 +2242,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(writeClient, initialCommit);
List<WriteStatus> initialWriteStatuses =
writeClient.insert(jsc.parallelize(initialRecords, 1), initialCommit).collect();
assertNoWriteErrors(initialWriteStatuses);
+ writeClient.commit(initialCommit, jsc.parallelize(initialWriteStatuses));
writeClient.close();
ExecutorService executors =
Executors.newFixedThreadPool(dataGen.getPartitionPaths().length);
@@ -2245,6 +2263,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(writeClient, newCommitTime);
List<WriteStatus> writeStatuses =
localWriteClient.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ localWriteClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
});
futures.add(future);
}
@@ -2471,6 +2490,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, commitTimestamps[0]);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
commitTimestamps[0]).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commitTimestamps[0], jsc.parallelize(writeStatuses));
// make all commits to inflight in metadata table. Still read should go
through, just that it may not return any data.
FileCreateUtilsLegacy.deleteDeltaCommit(basePath + "/.hoodie/metadata/",
commitTimestamps[0]);
@@ -2500,6 +2520,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, commitTimestamps[i]);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
commitTimestamps[i]).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(commitTimestamps[i], jsc.parallelize(writeStatuses));
}
// Ensure we can see files from each commit
@@ -2813,6 +2834,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
List<WriteStatus> writeStatuses =
client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
newCommitTime = client.createNewInstantTime();
@@ -2820,6 +2842,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// There is no way to simulate failed commit on the main dataset, hence
we simply delete the completed
@@ -2838,6 +2861,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
}
}
@@ -2926,6 +2950,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
}
@@ -2946,6 +2971,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
}
// Metadata table is recreated, during bootstrapping of metadata table.
@@ -3040,6 +3066,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
newCommitTime = client.createNewInstantTime();
@@ -3047,6 +3074,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
// There is no way to simulate failed commit on the main dataset, hence
we simply delete the completed
@@ -3065,6 +3093,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records,
1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
// Post rollback commit and metadata should be valid
validateMetadata(client);
@@ -3137,6 +3166,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records =
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
List<String> metadataPartitions = metadata(client,
storage).getAllPartitionPaths();
@@ -3158,6 +3188,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records =
nonPartitionedGenerator.generateInserts(newCommitTime, 10);
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<WriteStatus> writeStatuses =
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
List<String> metadataPartitions = metadata(client,
storage).getAllPartitionPaths();
@@ -3245,6 +3276,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
validateMetadata(client);
Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(),
storage);
@@ -3291,6 +3323,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(recordsFirstBatch, 1),
firstCommitTime).collect();
assertNoWriteErrors(writeStatuses);
+ client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
commitTimestamps.add(firstCommitTime);
}
assertEquals(false,
@@ -3311,7 +3344,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
String secondCommitTime = client.createNewInstantTime();
List<HoodieRecord> recordsSecondBatch =
dataGen.generateInserts(secondCommitTime, 100);
WriteClientTestUtils.startCommitWithTime(client, secondCommitTime);
- assertThrows(HoodieException.class, () ->
client.insert(jsc.parallelize(recordsSecondBatch, 1), secondCommitTime));
+ assertThrows(HoodieException.class, () ->
client.insert(jsc.parallelize(recordsSecondBatch, 1),
secondCommitTime).collect());
}
}
@@ -3340,7 +3373,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<HoodieRecord> records = index == 0 ?
dataGen.generateInsertsForPartition(newCommitTime, 10, partition)
: dataGen.generateUniqueUpdates(newCommitTime, 5);
WriteClientTestUtils.startCommitWithTime(client, newCommitTime);
- client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ List<WriteStatus> writeStatuses =
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ client.commit(newCommitTime, jsc.parallelize(writeStatuses));
}
}
assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(),
3);
@@ -3551,7 +3585,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
assertTrue(metadataWriteClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty()));
HoodieWriteMetadata result =
metadataWriteClient.compact(compactionInstantTime);
metadataWriteClient.commitCompaction(compactionInstantTime, result,
Option.empty());
-
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime));
+ // validate that compaction has been complete in metadata table.
+ assertTrue(((SparkHoodieBackedTableMetadataWriter)
metadataWriter).getTableMetadata().getMetadataMetaClient().reloadActiveTimeline().filterCompletedInstants()
+ .containsInstant(compactionInstantTime));
// verify metadata table
validateMetadata(client);
@@ -3585,7 +3621,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// First commit
List<HoodieRecord> firstBatchOfrecords =
dataGen.generateInserts(firstCommitTime, 10);
WriteClientTestUtils.startCommitWithTime(client, firstCommitTime);
- client.insert(jsc.parallelize(firstBatchOfrecords, 1),
firstCommitTime).collect();
+ List<WriteStatus> writeStatuses =
client.insert(jsc.parallelize(firstBatchOfrecords, 1),
firstCommitTime).collect();
+ client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
// Records got inserted and RI is initialized
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -3597,7 +3634,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
secondCommitTime = client.createNewInstantTime();
List<HoodieRecord> secondBatchOfrecords =
dataGen.generateInserts(secondCommitTime, 5);
WriteClientTestUtils.startCommitWithTime(client, secondCommitTime);
- client.bulkInsert(jsc.parallelize(secondBatchOfrecords, 1),
secondCommitTime).collect();
+ writeStatuses = client.bulkInsert(jsc.parallelize(secondBatchOfrecords,
1), secondCommitTime).collect();
+ client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
assertEquals(secondBatchOfrecords.size(),
HoodieClientTestUtils.readCommit(writeConfig.getBasePath(),
engineContext.getSqlContext(), metaClient.reloadActiveTimeline(),
secondCommitTime, true, INSTANT_GENERATOR).count());
@@ -3619,7 +3657,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
String deleteTime = client.createNewInstantTime();
WriteClientTestUtils.startCommitWithTime(client, deleteTime);
- client.delete(jsc.parallelize(recordsToDelete,
1).map(HoodieRecord::getKey), deleteTime);
+ writeStatuses = client.delete(jsc.parallelize(recordsToDelete,
1).map(HoodieRecord::getKey), deleteTime).collect();
+ client.commit(deleteTime, jsc.parallelize(writeStatuses));
// RI should not return mappings for deleted records
metadataReader = HoodieTableMetadata.create(
@@ -3634,7 +3673,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
// An empty delete to trigger compaction
String deleteTime = client.startCommit();
- client.delete(jsc.emptyRDD(), deleteTime);
+ List<WriteStatus> writeStatuses = client.delete(jsc.emptyRDD(),
deleteTime).collect();
+ client.commit(deleteTime, jsc.parallelize(writeStatuses));
HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
@@ -3648,7 +3688,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Adding records with the same keys after delete should work
String reinsertTime = client.startCommit();
- client.upsert(jsc.parallelize(recordsToDelete, 1),
reinsertTime).collect();
+ writeStatuses = client.upsert(jsc.parallelize(recordsToDelete, 1),
reinsertTime).collect();
+ client.commit(reinsertTime, jsc.parallelize(writeStatuses));
// New mappings should have been created for re-inserted records and
should map to the new commit time
metadataReader = HoodieTableMetadata.create(context, storage,
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 9d5042fb51c..68e7909265f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -16,10 +16,11 @@
* limitations under the License.
*/
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
similarity index 99%
rename from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
index ec1f94526ab..365087cca9c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.functional;
+package org.apache.hudi.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -316,6 +317,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
updateBatchWithoutCommit(client.createNewInstantTime(),
Objects.requireNonNull(baseRecordsToUpdate, "The records to update
should not be null"));
// rollback the delta_commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(client.rollbackFailedWrites(metaClient), "The last
delta_commit should be rolled back");
// another update
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index f7bd900cd41..8647fbcbb11 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -63,7 +63,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestSparkSortAndSizeClustering extends
HoodieSparkClientTestHarness {
-
private HoodieWriteConfig config;
private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
index f774550b8ef..1f4416ea354 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java
@@ -24,7 +24,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.functional.TestHoodieBackedMetadata;
+import org.apache.hudi.functional.TestHoodieBackedMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;