This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 54276a957b82 refactor: modularize long test methods in
TestHoodieClientOnCopyOnWriteStorage (#18377)
54276a957b82 is described below
commit 54276a957b8254510b118ddeee29dfd1f149560b
Author: yaojiejia <[email protected]>
AuthorDate: Mon Mar 30 04:36:29 2026 -0400
refactor: modularize long test methods in
TestHoodieClientOnCopyOnWriteStorage (#18377)
* init changes
* minor fix
---
.../TestHoodieClientOnCopyOnWriteStorage.java | 496 ++++++++++-----------
1 file changed, 241 insertions(+), 255 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index fcb08a2e2706..6f873d2d5059 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -60,7 +60,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
-import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -470,61 +469,52 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
@Override
protected void testMergeHandle(HoodieWriteConfig config) throws IOException {
- final String instantTime = "007";
HoodieTableMetaClient metaClient =
HoodieClientTestUtils.createMetaClient(jsc, basePath);
HoodieTable table = getHoodieTable(metaClient, config);
Pair<String, String> partitionAndBaseFilePaths =
getPartitionAndBaseFilePathsFromLatestCommitMetadata(metaClient);
String partitionPath = partitionAndBaseFilePaths.getLeft();
String baseFilePath = partitionAndBaseFilePaths.getRight();
jsc.parallelize(Arrays.asList(1)).map(e -> {
-
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath);
+ performMergeValidationCheck(config, "007", table, partitionPath,
baseFile, false);
- HoodieWriteMergeHandle handle = null;
- try {
- handle = new HoodieWriteMergeHandle(config, instantTime, table, new
HashMap<>(),
- partitionPath, FSUtils.getFileId(baseFile.getFileName()),
baseFile, new SparkTaskContextSupplier(),
- config.populateMetaFields() ? Option.empty() :
- Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
- WriteStatus writeStatus = new WriteStatus(false, 0.0);
- writeStatus.setStat(new HoodieWriteStat());
- writeStatus.getStat().setNumWrites(0);
- handle.performMergeDataValidationCheck(writeStatus);
- } catch (HoodieCorruptedDataException e1) {
+ config.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
+ HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+ performMergeValidationCheck(cfg2, "006", table, partitionPath, baseFile,
true);
+ return true;
+ }).collect();
+ }
+
+ private static void performMergeValidationCheck(HoodieWriteConfig cfg,
String instantTime, HoodieTable table,
+ String partitionPath,
HoodieBaseFile baseFile,
+ boolean expectCorruptedException)
throws Exception {
+ HoodieWriteMergeHandle handle = null;
+ try {
+ Option<BaseKeyGenerator> keyGenOption = cfg.populateMetaFields() ?
Option.empty()
+ : Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(cfg.getProps()));
+ handle = new HoodieWriteMergeHandle(cfg, instantTime, table, new
HashMap<>(),
+ partitionPath, FSUtils.getFileId(baseFile.getFileName()), baseFile,
+ new SparkTaskContextSupplier(), keyGenOption);
+ WriteStatus writeStatus = new WriteStatus(false, 0.0);
+ writeStatus.setStat(new HoodieWriteStat());
+ writeStatus.getStat().setNumWrites(0);
+ handle.performMergeDataValidationCheck(writeStatus);
+ if (expectCorruptedException) {
+ fail("Expected HoodieCorruptedDataException was not thrown");
+ }
+ } catch (HoodieCorruptedDataException e) {
+ if (!expectCorruptedException) {
fail("Exception not expected because merge validation check is
disabled");
- } finally {
- if (handle != null) {
- handle.close();
- }
}
-
- handle = null;
- try {
- final String newInstantTime = "006";
- config.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
- HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
- handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
- partitionPath, FSUtils.getFileId(baseFile.getFileName()),
baseFile, new SparkTaskContextSupplier(),
- config.populateMetaFields() ? Option.empty() :
- Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
- WriteStatus writeStatus = new WriteStatus(false, 0.0);
- writeStatus.setStat(new HoodieWriteStat());
- writeStatus.getStat().setNumWrites(0);
- handle.performMergeDataValidationCheck(writeStatus);
- fail("The above line should have thrown an exception");
- } catch (HoodieCorruptedDataException e2) {
- // expected
- } finally {
- if (handle != null) {
- try {
- handle.close();
- } catch (Exception ex) {
- // ignore exception from validation check
- }
+ } finally {
+ if (handle != null) {
+ try {
+ handle.close();
+ } catch (Exception ex) {
+ // ignore exception from validation check
}
}
- return true;
- }).collect();
+ }
}
@Test
@@ -719,6 +709,114 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
return Pair.of(recreatedStatuses, inserts);
}
+ private List<WriteStatus> upsertBatchRecords(SparkRDDWriteClient client,
String commitTime,
+ List<HoodieRecord> records, int
numSlices) throws IOException {
+ WriteClientTestUtils.startCommitWithTime(client, commitTime);
+ List<WriteStatus> statusList = client.upsert(jsc.parallelize(records,
numSlices), commitTime).collect();
+ client.commit(commitTime, jsc.parallelize(statusList), Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statusList);
+ return statusList;
+ }
+
+ private void verifyExpandedFile(FileFormatUtils fileUtils, List<WriteStatus>
statusList,
+ String expectedFileId, String
prevCommitTime, int expectedRecordCount,
+ Set<String> keys1, Set<String> keys2, String
commitTime) throws IOException {
+ assertEquals(1, statusList.size(), "Just 1 file needs to be updated.");
+ assertEquals(expectedFileId, statusList.get(0).getFileId(), "Existing file
should be expanded");
+ assertEquals(prevCommitTime, statusList.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
+ StoragePath newFile = new StoragePath(basePath,
statusList.get(0).getStat().getPath());
+ assertEquals(expectedRecordCount, fileUtils.readRowKeys(storage,
newFile).size(),
+ "file should contain " + expectedRecordCount + " records");
+ for (GenericRecord record : fileUtils.readAvroRecords(storage, newFile)) {
+ String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ assertEquals(commitTime,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
+ "only expect " + commitTime);
+ assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
+ "key expected to be part of " + commitTime);
+ }
+ }
+
+ private void verifyTwoFileCommitDistribution(FileFormatUtils fileUtils,
List<HoodieBaseFile> files,
+ String file1Id, Set<String>
keys2, Set<String> keys3,
+ String commitTime3, int
expectedUpdates) throws IOException {
+ int numTotalInsertsInCommit3 = 0;
+ int numTotalUpdatesInCommit3 = 0;
+ for (HoodieBaseFile file : files) {
+ if (file.getFileName().contains(file1Id)) {
+ assertEquals(commitTime3, file.getCommitTime(), "Existing file should
be expanded");
+ List<GenericRecord> records = fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath()));
+ for (GenericRecord record : records) {
+ String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String recordCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ if (recordCommitTime.equals(commitTime3)) {
+ if (keys2.contains(recordKey)) {
+ keys2.remove(recordKey);
+ numTotalUpdatesInCommit3++;
+ } else {
+ numTotalInsertsInCommit3++;
+ }
+ }
+ }
+ assertEquals(0, keys2.size(), "All keys added in commit 2 must be
updated in commit3 correctly");
+ } else {
+ assertEquals(commitTime3, file.getCommitTime(), "New file must be
written for commit 3");
+ List<GenericRecord> records = fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath()));
+ for (GenericRecord record : records) {
+ String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ assertEquals(commitTime3,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
+ "only expect commit3");
+ assertTrue(keys3.contains(recordKey), "key expected to be part of
commit3");
+ }
+ numTotalInsertsInCommit3 += records.size();
+ }
+ }
+ assertEquals(expectedUpdates, numTotalUpdatesInCommit3, "Total updates in
commit3 must add up");
+ assertEquals(keys3.size(), numTotalInsertsInCommit3, "Total inserts in
commit3 must add up");
+ }
+
+ private List<String> getFileGroupIds(HoodieSparkCopyOnWriteTable table,
String partitionPath) {
+ return table.getFileSystemView().getAllFileGroups(partitionPath)
+ .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ }
+
+ private void verifyInsertExpandedFile(FileFormatUtils fileUtils,
List<WriteStatus> statuses,
+ String expectedFileId, String
prevCommitTime, int expectedRecordCount,
+ Set<String> keys1, Set<String> keys2,
+ String commitTime1, String
commitTime2) throws IOException {
+ assertEquals(expectedFileId, statuses.get(0).getFileId(), "Existing file
should be expanded");
+ assertEquals(prevCommitTime, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
+ StoragePath newFile = new StoragePath(basePath,
statuses.get(0).getStat().getPath());
+ assertEquals(expectedRecordCount, fileUtils.readRowKeys(storage,
newFile).size(),
+ "file should contain " + expectedRecordCount + " records");
+ for (GenericRecord record : fileUtils.readAvroRecords(storage, newFile)) {
+ String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ String recCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+ assertTrue(commitTime1.equals(recCommitTime) ||
commitTime2.equals(recCommitTime),
+ "Record expected to be part of commit 1 or commit2");
+ assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
+ "key expected to be part of commit 1 or commit2");
+ }
+ }
+
+ private void verifyAllFilesAtCommit(FileFormatUtils fileUtils,
List<HoodieBaseFile> files,
+ String expectedCommitTime, int
expectedTotalRecords) throws IOException {
+ int totalRecords = 0;
+ for (HoodieBaseFile file : files) {
+ assertEquals(expectedCommitTime, file.getCommitTime(), "All files must
be at " + expectedCommitTime);
+ totalRecords += fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath())).size();
+ }
+ assertEquals(expectedTotalRecords, totalRecords, "Total number of records
must add up");
+ }
+
+ private void insertCommitWithSchema(SparkRDDWriteClient client,
HoodieTestDataGenerator gen,
+ int count, String schema) {
+ String commitTime = client.startCommit();
+ List<HoodieRecord> batch = gen.generateInsertsAsPerSchema(commitTime,
count, schema);
+ JavaRDD<HoodieRecord> records =
context.getJavaSparkContext().parallelize(batch, 1);
+ JavaRDD<WriteStatus> writeStatuses = client.insert(records, commitTime);
+ client.commit(commitTime, writeStatuses);
+ }
+
@Test
public void testUpdateRejectForClustering() throws IOException {
final String testPartitionPath = "2016/09/26";
@@ -734,8 +832,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
//1. insert to generate 2 file group
Pair<JavaRDD<WriteStatus>, List<HoodieRecord>> upsertResult =
insertBatchRecords(client, "001", 600, 2, 1, SparkRDDWriteClient::upsert);
List<HoodieRecord> inserts1 = upsertResult.getValue();
- List<String> fileGroupIds1 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
- .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ List<String> fileGroupIds1 = getFileGroupIds(table, testPartitionPath);
assertEquals(2, fileGroupIds1.size());
// 2. generate clustering plan for fileGroupIds1 file groups
@@ -746,8 +843,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// 3. insert one record with no updating reject exception, and not merge
the small file, just generate a new file group
insertBatchRecords(client, "003", 1, 1, 1, SparkRDDWriteClient::upsert);
- List<String> fileGroupIds2 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
- .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ List<String> fileGroupIds2 = getFileGroupIds(table, testPartitionPath);
assertEquals(3, fileGroupIds2.size());
// 4. update one record for the clustering two file groups, throw reject
update exception
@@ -765,9 +861,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
insertBatchRecords(client, "005", 1, 1, 1,
SparkRDDWriteClient::upsert).getKey();
fileGroupIds2.removeAll(fileGroupIds1);
assertEquals(fileGroupIds2.get(0), statuses.collect().get(0).getFileId());
- List<String> firstInsertFileGroupIds4 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
- .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
- assertEquals(3, firstInsertFileGroupIds4.size());
+ assertEquals(3, getFileGroupIds(table, testPartitionPath).size());
}
/**
@@ -777,112 +871,43 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
public void testSmallInsertHandlingForUpserts() throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
- // setup the small file handling params
// hold upto 200 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150));
-
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config);
FileFormatUtils fileUtils = getFileUtilsInstance(metaClient);
// Inserts => will write file1
String commitTime1 = "001";
- WriteClientTestUtils.startCommitWithTime(client, commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
-
- JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
- List<WriteStatus> statusList = client.upsert(insertRecordsRDD1,
commitTime1).collect();
- writeClient.commit(commitTime1, jsc.parallelize(statusList),
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
-
- assertEquals(1, statusList.size(), "Just 1 file needs to be added.");
- String file1 = statusList.get(0).getFileId();
+ List<WriteStatus> statusList1 = upsertBatchRecords(client, commitTime1,
inserts1, 1);
+ assertEquals(1, statusList1.size(), "Just 1 file needs to be added.");
+ String file1 = statusList1.get(0).getFileId();
assertEquals(100,
- fileUtils.readRowKeys(storage, new StoragePath(basePath,
statusList.get(0).getStat().getPath()))
+ fileUtils.readRowKeys(storage, new StoragePath(basePath,
statusList1.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Update + Inserts such that they just expand file1
String commitTime2 = "002";
- WriteClientTestUtils.startCommitWithTime(client, commitTime2);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = recordsToRecordKeySet(inserts2);
- List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
- insertsAndUpdates2.addAll(inserts2);
+ List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1));
-
- JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 1);
- statusList = client.upsert(insertAndUpdatesRDD2, commitTime2).collect();
- client.commit(commitTime2, jsc.parallelize(statusList), Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
-
- assertEquals(1, statusList.size(), "Just 1 file needs to be updated.");
- assertEquals(file1, statusList.get(0).getFileId(), "Existing file should
be expanded");
- assertEquals(commitTime1, statusList.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
- StoragePath newFile = new StoragePath(basePath,
statusList.get(0).getStat().getPath());
- assertEquals(140, fileUtils.readRowKeys(storage, newFile).size(),
- "file should contain 140 records");
-
- List<GenericRecord> records = fileUtils.readAvroRecords(storage, newFile);
- for (GenericRecord record : records) {
- String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- assertEquals(commitTime2,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect
commit2");
- assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey), "key
expected to be part of commit2");
- }
+ List<WriteStatus> statusList2 = upsertBatchRecords(client, commitTime2,
insertsAndUpdates2, 1);
+ verifyExpandedFile(fileUtils, statusList2, file1, commitTime1, 140, keys1,
keys2, commitTime2);
// update + inserts such that file1 is updated and expanded, a new file2
is created.
String commitTime3 = "003";
- WriteClientTestUtils.startCommitWithTime(client, commitTime3);
- List<HoodieRecord> insertsAndUpdates3 =
dataGen.generateInserts(commitTime3, 200);
+ List<HoodieRecord> insertsAndUpdates3 = new
ArrayList<>(dataGen.generateInserts(commitTime3, 200));
Set<String> keys3 = recordsToRecordKeySet(insertsAndUpdates3);
- List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3,
inserts2);
- insertsAndUpdates3.addAll(updates3);
-
- JavaRDD<HoodieRecord> insertAndUpdatesRDD3 =
jsc.parallelize(insertsAndUpdates3, 1);
- statusList = client.upsert(insertAndUpdatesRDD3, commitTime3).collect();
- client.commit(commitTime3, jsc.parallelize(statusList), Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
- assertEquals(2, statusList.size(), "2 files needs to be committed.");
- HoodieTableMetaClient metadata = createMetaClient();
-
- HoodieTable table = getHoodieTable(metadata, config);
- BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
- List<HoodieBaseFile> files =
- fileSystemView.getLatestBaseFilesBeforeOrOn(testPartitionPath,
commitTime3).collect(Collectors.toList());
- int numTotalInsertsInCommit3 = 0;
- int numTotalUpdatesInCommit3 = 0;
- for (HoodieBaseFile file : files) {
- if (file.getFileName().contains(file1)) {
- assertEquals(commitTime3, file.getCommitTime(), "Existing file should
be expanded");
- records = fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath()));
- for (GenericRecord record : records) {
- String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String recordCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
- if (recordCommitTime.equals(commitTime3)) {
- if (keys2.contains(recordKey)) {
- keys2.remove(recordKey);
- numTotalUpdatesInCommit3++;
- } else {
- numTotalInsertsInCommit3++;
- }
- }
- }
- assertEquals(0, keys2.size(), "All keys added in commit 2 must be
updated in commit3 correctly");
- } else {
- assertEquals(commitTime3, file.getCommitTime(), "New file must be
written for commit 3");
- records = fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath()));
- for (GenericRecord record : records) {
- String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- assertEquals(commitTime3,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
- "only expect commit3");
- assertTrue(keys3.contains(recordKey), "key expected to be part of
commit3");
- }
- numTotalInsertsInCommit3 += records.size();
- }
- }
- assertEquals(numTotalUpdatesInCommit3, inserts2.size(), "Total updates in
commit3 must add up");
- assertEquals(numTotalInsertsInCommit3, keys3.size(), "Total inserts in
commit3 must add up");
+ insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime3, inserts2));
+ List<WriteStatus> statusList3 = upsertBatchRecords(client, commitTime3,
insertsAndUpdates3, 1);
+ assertEquals(2, statusList3.size(), "2 files needs to be committed.");
+ List<HoodieBaseFile> files = getHoodieTable(createMetaClient(),
config).getBaseFileOnlyView()
+ .getLatestBaseFilesBeforeOrOn(testPartitionPath,
commitTime3).collect(Collectors.toList());
+ verifyTwoFileCommitDistribution(fileUtils, files, file1, keys2, keys3,
commitTime3, inserts2.size());
}
/**
@@ -916,21 +941,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
insertResult = insertBatchRecords(client, commitTime2, 40, 1, 1,
SparkRDDWriteClient::insert);
Set<String> keys2 = recordsToRecordKeySet(insertResult.getRight());
statuses = insertResult.getLeft().collect();
- assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be
expanded");
- assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
-
- StoragePath newFile = new StoragePath(basePath,
statuses.get(0).getStat().getPath());
- assertEquals(140, fileUtils.readRowKeys(storage, newFile).size(),
- "file should contain 140 records");
- List<GenericRecord> records = fileUtils.readAvroRecords(storage, newFile);
- for (GenericRecord record : records) {
- String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
- String recCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
- assertTrue(commitTime1.equals(recCommitTime) ||
commitTime2.equals(recCommitTime),
- "Record expected to be part of commit 1 or commit2");
- assertTrue(keys2.contains(recordKey) || keys1.contains(recordKey),
- "key expected to be part of commit 1 or commit2");
- }
+ verifyInsertExpandedFile(fileUtils, statuses, file1, commitTime1, 140,
keys1, keys2, commitTime1, commitTime2);
// Lots of inserts such that file1 is updated and expanded, a new file2 is
created.
String commitTime3 = "003";
@@ -940,19 +951,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
fileUtils.readRowKeys(storage, new StoragePath(basePath,
statuses.get(0).getStat().getPath())).size()
+ fileUtils.readRowKeys(storage, new StoragePath(basePath,
statuses.get(1).getStat().getPath())).size(),
"file should contain 340 records");
-
- HoodieTableMetaClient metaClient = createMetaClient();
- HoodieTable table = getHoodieTable(metaClient, config);
- List<HoodieBaseFile> files = table.getBaseFileOnlyView()
+ List<HoodieBaseFile> files = getHoodieTable(createMetaClient(),
config).getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath,
commitTime3).collect(Collectors.toList());
assertEquals(2, files.size(), "Total of 2 valid data files");
-
- int totalInserts = 0;
- for (HoodieBaseFile file : files) {
- assertEquals(commitTime3, file.getCommitTime(), "All files must be at
commit 3");
- totalInserts += fileUtils.readAvroRecords(storage, new
StoragePath(file.getPath())).size();
- }
- assertEquals(340, totalInserts, "Total number of records must add up");
+ verifyAllFilesAtCommit(fileUtils, files, commitTime3, 340);
}
/**
@@ -972,19 +974,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// Inserts => will write file1
String commitTime1 = "001";
- WriteClientTestUtils.startCommitWithTime(client, commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
insertSplitLimit); // this writes ~500kb
Set<String> keys1 = recordsToRecordKeySet(inserts1);
List<String> keysSoFar = new ArrayList<>(keys1);
- JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
- List<WriteStatus> statusList = client.upsert(insertRecordsRDD1,
commitTime1).collect();
- client.commit(commitTime1, jsc.parallelize(statusList), Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
-
- assertEquals(1, statusList.size(), "Just 1 file needs to be added.");
- String file1 = statusList.get(0).getFileId();
+ List<WriteStatus> statusList1 = upsertBatchRecords(client, commitTime1,
inserts1, 1);
+ assertEquals(1, statusList1.size(), "Just 1 file needs to be added.");
+ String file1 = statusList1.get(0).getFileId();
assertEquals(100, getFileUtilsInstance(metaClient).readRowKeys(
- storage, new StoragePath(basePath,
statusList.get(0).getStat().getPath())).size(), "file should contain 100
records");
+ storage, new StoragePath(basePath,
statusList1.get(0).getStat().getPath())).size(), "file should contain 100
records");
// Delete 20 among 100 inserted
testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
@@ -1007,10 +1004,10 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6,
20);
List<HoodieKey> hoodieKeysToDelete3 =
randomSelectAsHoodieKeys(dummyInserts3, 20);
JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
- statusList = client.delete(deleteKeys3, commitTime6).collect();
- client.commit(commitTime6, jsc.parallelize(statusList), Option.empty(),
COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
- assertEquals(0, statusList.size(), "Just 0 write status for delete.");
+ List<WriteStatus> nonExistentDeleteStatuses = client.delete(deleteKeys3,
commitTime6).collect();
+ client.commit(commitTime6, jsc.parallelize(nonExistentDeleteStatuses),
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(nonExistentDeleteStatuses);
+ assertEquals(0, nonExistentDeleteStatuses.size(), "Just 0 write status for
delete.");
assertTheEntireDatasetHasAllRecordsStill(150);
@@ -1211,20 +1208,11 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen =
new HoodieTestDataGenerator(new
String[]{DEFAULT_FIRST_PARTITION_PATH});
- String firstCommit = client.startCommit();
- List<HoodieRecord> firstBatch =
dataGen.generateInsertsAsPerSchema(firstCommit, 10, schemaStr);
- JavaRDD<HoodieRecord> records =
context.getJavaSparkContext().parallelize(firstBatch, 1);
- JavaRDD<WriteStatus> writeStatuses = client.insert(records, firstCommit);
- client.commit(firstCommit, writeStatuses);
-
- // Not create another commit on DEFAULT_SECOND_PARTITION_PATH partition
- // with schema that contains new columns.
- String secondCommit = client.startCommit();
+ insertCommitWithSchema(client, dataGen, 10, schemaStr);
+
+ // create another commit with schema that contains new columns.
String latestSchemaStr = TRIP_EXAMPLE_SCHEMA_EVOLVED_1;
- List<HoodieRecord> secondBatch =
dataGen.generateInsertsAsPerSchema(secondCommit, 10, latestSchemaStr);
- records = context.getJavaSparkContext().parallelize(secondBatch, 1);
- writeStatuses = client.insert(records, secondCommit);
- client.commit(secondCommit, writeStatuses);
+ insertCommitWithSchema(client, dataGen, 10, latestSchemaStr);
// Create cluster commit on DEFAULT_FIRST_PARTITION_PATH partition
// Here pass in precommit validator as SqlQueryEqualityPreCommitValidator
and check that trip_id is not null.
@@ -1479,15 +1467,9 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
Set<String> deletePartitionReplaceFileIds1 =
deletePartitionWithCommit(client, commitTime4,
Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
- List<HoodieBaseFile> baseFiles =
HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
- String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
- assertEquals(0, baseFiles.size());
- baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
- String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
- assertTrue(baseFiles.size() > 0);
- baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
- String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
- assertTrue(baseFiles.size() > 0);
+ assertEquals(0,
getLatestBaseFilesForPartition(DEFAULT_FIRST_PARTITION_PATH).size());
+
assertTrue(getLatestBaseFilesForPartition(DEFAULT_SECOND_PARTITION_PATH).size()
> 0);
+
assertTrue(getLatestBaseFilesForPartition(DEFAULT_THIRD_PARTITION_PATH).size()
> 0);
// delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
String commitTime5 = "005";
@@ -1498,11 +1480,15 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
expectedFileId.addAll(batch3Buckets);
assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
- baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
+ assertEquals(0, HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
- String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
- assertEquals(0, baseFiles.size());
+ String.format("%s/%s/*", basePath,
DEFAULT_THIRD_PARTITION_PATH)).size());
+ }
+
+ private List<HoodieBaseFile> getLatestBaseFilesForPartition(String
partitionPath) {
+ return HoodieClientTestUtils.getLatestBaseFiles(basePath, storage,
+ String.format("%s/%s/*", basePath, partitionPath));
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String
instantTime, SparkRDDWriteClient client,
@@ -1664,28 +1650,46 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
HoodieClientTestUtils.read(jsc, basePath, sqlContext, storage,
fullPartitionPaths).count(), "Must contain " + totalRecords + " records");
}
- @Test
- public void testClusteringCommitInPresenceOfInflightCommit() throws
Exception {
+ private Properties createOccProperties() {
Properties properties = getDisabledRowWriterProperties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
- HoodieLockConfig lockConfig = createLockConfig(new
PreferWriterConflictResolutionStrategy());
- HoodieCleanConfig cleanConfig =
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
- HoodieWriteConfig insertWriteConfig = getConfigBuilder()
+ return properties;
+ }
+
+ private HoodieWriteConfig buildOccWriteConfig(Properties properties,
HoodieCleanConfig cleanConfig,
+ ConflictResolutionStrategy
strategy) {
+ return getConfigBuilder()
.withCleanConfig(cleanConfig)
- .withLockConfig(lockConfig)
+ .withLockConfig(createLockConfig(strategy))
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withProperties(properties)
.build();
- SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+ }
- // Create a base commit on a file.
- int numRecords = 200;
+ private Pair<String, HoodieTestDataGenerator>
seedTableWithFirstCommit(SparkRDDWriteClient client,
+ int
numRecords) throws Exception {
String firstCommit = WriteClientTestUtils.createNewInstantTime();
- String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(
+ new String[]{HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH});
writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
1, INSTANT_GENERATOR);
+ return Pair.of(firstCommit, dataGenerator);
+ }
+
+ @Test
+ public void testClusteringCommitInPresenceOfInflightCommit() throws
Exception {
+ Properties properties = createOccProperties();
+ HoodieLockConfig lockConfig = createLockConfig(new
PreferWriterConflictResolutionStrategy());
+ HoodieCleanConfig cleanConfig =
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+ HoodieWriteConfig insertWriteConfig = buildOccWriteConfig(properties,
cleanConfig,
+ new PreferWriterConflictResolutionStrategy());
+ SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+
+ // Create a base commit on a file.
+ Pair<String, HoodieTestDataGenerator> seed =
seedTableWithFirstCommit(client, 200);
+ String firstCommit = seed.getLeft();
+ HoodieTestDataGenerator dataGenerator = seed.getRight();
// Do an upsert operation without autocommit.
String inflightCommit = WriteClientTestUtils.createNewInstantTime();
@@ -1722,25 +1726,16 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
@Test
public void testIngestionCommitInPresenceOfCompletedClusteringCommit()
throws Exception {
- Properties properties = getDisabledRowWriterProperties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+ Properties properties = createOccProperties();
HoodieCleanConfig cleanConfig =
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
- HoodieWriteConfig insertWriteConfig = getConfigBuilder()
- .withCleanConfig(cleanConfig)
- .withLockConfig(createLockConfig(new
PreferWriterConflictResolutionStrategy()))
-
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
- .withProperties(properties)
- .build();
+ HoodieWriteConfig insertWriteConfig = buildOccWriteConfig(properties,
cleanConfig,
+ new PreferWriterConflictResolutionStrategy());
SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
// Create a base commit on a file.
- int numRecords = 200;
- String firstCommit = WriteClientTestUtils.createNewInstantTime();
- String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
- writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
- numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
- 1, INSTANT_GENERATOR);
+ Pair<String, HoodieTestDataGenerator> seed =
seedTableWithFirstCommit(client, 200);
+ String firstCommit = seed.getLeft();
+ HoodieTestDataGenerator dataGenerator = seed.getRight();
// Create and temporarily block a lower timestamp for ingestion.
String inflightCommit = WriteClientTestUtils.createNewInstantTime();
@@ -1795,24 +1790,16 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
.filterCompletedInstants().lastInstant().get().requestedTime());
// Do insert-overwrite operation on the existing partitions, without
committing the data.
- String secondCommit = WriteClientTestUtils.createNewInstantTime();
- WriteClientTestUtils.startCommitWithTime(client, secondCommit,
REPLACE_COMMIT_ACTION);
- List<HoodieRecord> records1 = dataGen.generateInserts(secondCommit, 10);
- JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
- HoodieWriteResult result1 = client.insertOverwrite(writeRecords1,
secondCommit);
- assertEquals(secondCommit, metaClient.reloadActiveTimeline()
- .filterInflightsAndRequested().lastInstant().get().requestedTime());
+ Pair<String, HoodieWriteResult> inflightOverwrite1 =
startInflightInsertOverwrite(client, 10);
+ String secondCommit = inflightOverwrite1.getLeft();
+ HoodieWriteResult result1 = inflightOverwrite1.getRight();
// Create second writer and do another insert-overwrite operation on the
existing partitions,
// without committing the data.
SparkRDDWriteClient client2 = new SparkRDDWriteClient(context,
insertWriteConfig);
- String thirdCommit = WriteClientTestUtils.createNewInstantTime();
- WriteClientTestUtils.startCommitWithTime(client2, thirdCommit,
REPLACE_COMMIT_ACTION);
- List<HoodieRecord> records2 = dataGen.generateInserts(thirdCommit, 10);
- JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
- HoodieWriteResult result2 = client2.insertOverwrite(writeRecords2,
thirdCommit);
- assertEquals(thirdCommit, metaClient.reloadActiveTimeline()
- .filterInflightsAndRequested().lastInstant().get().requestedTime());
+ Pair<String, HoodieWriteResult> inflightOverwrite2 =
startInflightInsertOverwrite(client2, 10);
+ String thirdCommit = inflightOverwrite2.getLeft();
+ HoodieWriteResult result2 = inflightOverwrite2.getRight();
// Complete first insert-overwrite operation.
client.commit(secondCommit, result1.getWriteStatuses(),
@@ -1884,6 +1871,17 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
}
+ private Pair<String, HoodieWriteResult>
startInflightInsertOverwrite(SparkRDDWriteClient client,
+ int
numRecords) {
+ String commitTime = WriteClientTestUtils.createNewInstantTime();
+ WriteClientTestUtils.startCommitWithTime(client, commitTime,
REPLACE_COMMIT_ACTION);
+ List<HoodieRecord> records = dataGen.generateInserts(commitTime,
numRecords);
+ HoodieWriteResult result = client.insertOverwrite(jsc.parallelize(records,
1), commitTime);
+ assertEquals(commitTime, metaClient.reloadActiveTimeline()
+ .filterInflightsAndRequested().lastInstant().get().requestedTime());
+ return Pair.of(commitTime, result);
+ }
+
protected HoodieInstant createRequestedClusterInstant(HoodieTableMetaClient
metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws
IOException {
HoodieClusteringPlan clusteringPlan =
ClusteringUtils.createClusteringPlan(EXECUTION_STRATEGY_CLASS_NAME.defaultValue(),
STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
@@ -1902,26 +1900,14 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
*/
@Test
public void testClusteringFailsOnPendingIngestionRequestedInstant() throws
Exception {
- Properties properties = getDisabledRowWriterProperties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+ Properties properties = createOccProperties();
HoodieCleanConfig cleanConfig =
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+ int numRecords = 200;
// Insert base data with a regular ingestion writer
- HoodieWriteConfig insertWriteConfig = getConfigBuilder()
- .withCleanConfig(cleanConfig)
- .withLockConfig(createLockConfig(new
PreferWriterConflictResolutionStrategy()))
-
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
- .withProperties(properties)
- .build();
- SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
-
- int numRecords = 200;
- String firstCommit = WriteClientTestUtils.createNewInstantTime();
- String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
- writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
- numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
- 1, INSTANT_GENERATOR);
+ SparkRDDWriteClient client = getHoodieWriteClient(
+ buildOccWriteConfig(properties, cleanConfig, new
PreferWriterConflictResolutionStrategy()));
+ seedTableWithFirstCommit(client, numRecords);
// Simulate an ingestion writer that has created a .requested commit with
an active heartbeat
String ingestionRequestedTime =
WriteClientTestUtils.createNewInstantTime();