nsivabalan commented on code in PR #13653:
URL: https://github.com/apache/hudi/pull/13653#discussion_r2248186882
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java:
##########
@@ -49,13 +53,15 @@ public static void deleteSavepoint(HoodieTable table,
String savepointTime) {
public static void validateSavepointRestore(HoodieTable table, String
savepointTime) {
// Make sure the restore was successful
table.getMetaClient().reloadActiveTimeline();
- Option<HoodieInstant> lastInstant = table.getActiveTimeline()
+ Option<HoodieInstant> lastInstant =
Option.fromJavaOptional(table.getActiveTimeline()
Review Comment:
minor. can we add java docs to L52 to call out what are we looking to
validate
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/common/InstantComparators.java:
##########
@@ -79,6 +79,15 @@ public CompletionTimeBasedComparator(Map<String, String>
comparableActions) {
@Override
public int compare(HoodieInstant instant1, HoodieInstant instant2) {
+ if (instant1.getCompletionTime() == null && instant2.getCompletionTime()
!= null) {
Review Comment:
did we add UTs for this?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(20);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
}
}
- private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
+ @Test
+ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ // Run compaction while delta-commit is in-flight
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed after compaction");
+
+ // write one more commit
+ String newCommitTime = client.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is still present because it was
completed before the target of the restore
+
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ }
+ }
+
+ @Test
+ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
Review Comment:
same here. parametrize w/ both 6 and 8
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointHelpers.java:
##########
@@ -66,4 +72,28 @@ public static void validateSavepointPresence(HoodieTable
table, String savepoint
throw new HoodieRollbackException("No savepoint for instantTime " +
savepointTime);
}
}
+
+ private static class SavepointInstantComparator implements
Comparator<HoodieInstant> {
+ private final boolean tableVersion8OrLater;
+ private final InstantComparator instantComparator;
+
+ public SavepointInstantComparator(boolean tableVersion8OrLater,
InstantComparator instantComparator) {
+ this.tableVersion8OrLater = tableVersion8OrLater;
+ this.instantComparator = instantComparator;
+ }
+
+ @Override
+ public int compare(HoodieInstant o1, HoodieInstant o2) {
+ if (tableVersion8OrLater) {
+ return instantComparator.completionTimeOrderedComparator().compare(o1,
o2);
+ } else {
+ // Do to special handling of compaction instants, we need to use
requested time based comparator for compaction instants but completion time
based comparator for others
+ if (o1.getAction().equals(HoodieTimeline.COMMIT_ACTION) ||
o2.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
Review Comment:
yes. this is not intuitive to understand. can we add some simple
illustration.
t1.dc,.... t2.dc, t11.compaction.req, t12.dc, t11.commit, t13.dc ... dc15.
If we are looking to restore to t12, and as we ordering the commits to
rollback based on completion time, we would rollback t11 compaction as well
(since t11 completed after t12 completed). but we can't do that. and hence the
special handling.
but trying to understand why this special handling is not required for table
8 and above.
how are we handling this case for v8 table w/o special handling.
From https://github.com/apache/hudi/pull/13653/files#r2246930654, I only see
we account for completed instant time or requested instant time.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(20);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
}
}
- private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
+ @Test
+ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
Review Comment:
should we parametrize this for version 6 and 8 ?
w/ all special handling we are doing in restore, its worth adding tests for
both table versions.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
return path.substring(path.indexOf(":") + 1);
}
- private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
- String
basefileExtension,
- String partitionPath,
- HoodieStorage
storage) throws IOException {
- LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+ String basefileExtension,
+ String partitionPath,
+ HoodieStorage storage)
throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path {} and
commit {}", partitionPath, commit);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
- return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
+ return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath),
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}
- private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant
instantToRollback,
- String partitionPath,
String basePath,
- String
baseFileExtension, HoodieStorage storage,
-
Option<HoodieCommitMetadata> commitMetadataOptional,
- Boolean
isCommitMetadataCompleted,
- HoodieTableType
tableType) throws IOException {
- // go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
- if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
- return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, storage);
+ private List<StoragePath> fetchFilesFromInstant(HoodieInstant
instantToRollback,
+ String partitionPath, String
basePath,
+ String baseFileExtension,
HoodieStorage storage,
+ Option<HoodieCommitMetadata>
commitMetadataOptional,
+ boolean
isCommitMetadataCompleted,
+ HoodieTableType tableType,
+ HoodieTableVersion
tableVersion) throws IOException {
+ // for MOR tables with version < 8, listing is required to fetch the log
files associated with base files added by this commit.
+ if (isCommitMetadataCompleted && (tableType ==
HoodieTableType.COPY_ON_WRITE ||
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
Review Comment:
good catch. this can only work in table version 8 and above, if we ordering
the commits based on completion time.
guess that fix simplified this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -317,56 +293,45 @@ private static String formatDeletePath(String path) {
return path.substring(path.indexOf(":") + 1);
}
- private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
- String
basefileExtension,
- String partitionPath,
- HoodieStorage
storage) throws IOException {
- LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
+ private List<StoragePath> listBaseFilesToBeDeleted(String commit,
+ String basefileExtension,
+ String partitionPath,
+ HoodieStorage storage)
throws IOException {
+ LOG.info("Collecting files to be cleaned/rolledback up for path {} and
commit {}", partitionPath, commit);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
- return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
+ return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath),
filter).stream().map(StoragePathInfo::getPath).collect(Collectors.toList());
}
- private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant
instantToRollback,
- String partitionPath,
String basePath,
- String
baseFileExtension, HoodieStorage storage,
-
Option<HoodieCommitMetadata> commitMetadataOptional,
- Boolean
isCommitMetadataCompleted,
- HoodieTableType
tableType) throws IOException {
- // go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
- if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
- return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, storage);
+ private List<StoragePath> fetchFilesFromInstant(HoodieInstant
instantToRollback,
+ String partitionPath, String
basePath,
+ String baseFileExtension,
HoodieStorage storage,
+ Option<HoodieCommitMetadata>
commitMetadataOptional,
+ boolean
isCommitMetadataCompleted,
+ HoodieTableType tableType,
+ HoodieTableVersion
tableVersion) throws IOException {
+ // for MOR tables with version < 8, listing is required to fetch the log
files associated with base files added by this commit.
+ if (isCommitMetadataCompleted && (tableType ==
HoodieTableType.COPY_ON_WRITE ||
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))) {
+ return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(), baseFileExtension);
} else {
return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, storage);
}
}
- private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
- String
partitionPath,
- String basePath,
-
HoodieCommitMetadata commitMetadata,
- String
baseFileExtension,
- HoodieStorage
storage) throws IOException {
+ private List<StoragePath> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
+ String partitionPath,
+ String basePath,
+ HoodieCommitMetadata
commitMetadata,
+ String
baseFileExtension) {
StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.requestedTime());
- List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath,
commitMetadata, partitionPath)
- .filter(entry -> {
- try {
- return storage.exists(entry);
- } catch (Exception e) {
- LOG.error("Exists check failed for " + entry.toString(), e);
- }
- // if any Exception is thrown, do not ignore. let's try to add the
file of interest to be deleted. we can't miss any files to be rolled back.
- return true;
- }).collect(Collectors.toList());
-
- return storage.listDirectEntries(filePaths, pathFilter);
+ return getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath)
Review Comment:
yeah. our rollback execution should be fine even if file does not exist. we
are good to remove this.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -326,21 +466,246 @@ void testCleaningCompletedRollback() throws Exception {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
- assertRowNumberEqualsTo(20);
+ assertEquals(commitToRowCount, getRecordCountPerCommit());
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
}
}
- private void upsertBatch(SparkRDDWriteClient client, List<HoodieRecord>
baseRecordsToUpdate) throws IOException {
+ @Test
+ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
writeClient.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ // Run compaction while delta-commit is in-flight
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed after compaction");
+
+ // write one more commit
+ String newCommitTime = client.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is still present because it was
completed before the target of the restore
+
assertTrue(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ }
+ }
+
+ @Test
+ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion.EIGHT);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+ // commit the compaction instant after the delta commit
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed during compaction");
+
+ // write one more commit
+ String newCommitTime = writeClient.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ // ensure the compaction instant is not present because it was completed
after the target of the restore
+
assertFalse(metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get())));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+ void
rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTableVersion
tableVersion) {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigWithCompactionAndConcurrencyControl(tableVersion);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ final int numRecords = 10;
+ writeInitialCommitsForAsyncServicesTests(numRecords);
+ // Schedule a compaction
+ Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
+ // Start a delta commit
+ String inflightCommit = client.startCommit();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(inflightCommit, numRecords);
+ JavaRDD<WriteStatus> writeStatus =
client.upsert(jsc.parallelize(records, 1), inflightCommit);
+
+ HoodieWriteMetadata result = client.compact(compactionInstant.get());
+ // commit the inflight delta commit
+ client.commit(inflightCommit, writeStatus);
+ // commit the compaction instant after the delta commit
+ client.commitCompaction(compactionInstant.get(), result, Option.empty());
+
+ client.savepoint(inflightCommit, "user1", "Savepoint for commit that
completed during compaction");
+
+ // write one more commit
+ String newCommitTime = writeClient.startCommit();
+ records = dataGen.generateInserts(newCommitTime, numRecords);
+ writeStatus = client.insert(jsc.parallelize(records, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatus);
+
+ // restore to savepoint
+ client.restoreToSavepoint(inflightCommit);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(inflightCommit, numRecords),
getRecordCountPerCommit());
+ boolean compactionIsPresent =
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstantsAsStream()
+ .anyMatch(hoodieInstant ->
hoodieInstant.requestedTime().equals(compactionInstant.get()));
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ // ensure the compaction instant is not present because it was
completed after the target of the restore
+ assertFalse(compactionIsPresent);
+ } else {
+ // for versions before 8, the compaction instant is not removed
because the log files of the savepoint instant are associated with this
compaction instant
+ assertTrue(compactionIsPresent);
+ }
+ assertEquals(tableVersion,
HoodieTableMetaClient.reload(metaClient).getTableConfig().getTableVersion());
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "NINE"})
+ void testMissingFileDoesNotFallRestore(HoodieTableVersion tableVersion)
throws Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getHoodieWriteConfigAndInitializeTable(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(false)
+ .compactionSmallFileSize(0).build(), tableVersion);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // establish base files
+ String firstCommitTime = client.startCommit();
+ int numRecords = 10;
+ List<HoodieRecord> initialRecords =
dataGen.generateInserts(firstCommitTime, numRecords);
+ client.commit(firstCommitTime,
client.insert(jsc.parallelize(initialRecords, 1), firstCommitTime));
+ // add updates that go to log files
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> updatedRecords =
dataGen.generateUniqueUpdates(secondCommitTime, numRecords);
+ client.commit(secondCommitTime,
client.upsert(jsc.parallelize(updatedRecords, 1), secondCommitTime));
+ client.savepoint(secondCommitTime, "user1", "Savepoint for commit that
completed during compaction");
+
+ // add a third delta commit with log and new base files
+ String thirdCommitTime = client.startCommit();
+ List<HoodieRecord> upsertRecords =
Stream.concat(dataGen.generateUniqueUpdates(thirdCommitTime,
numRecords).stream(),
+ dataGen.generateInserts(thirdCommitTime,
numRecords).stream()).collect(Collectors.toList());
+ List<WriteStatus> writeStatuses =
client.upsert(jsc.parallelize(upsertRecords, 1), thirdCommitTime).collect();
+ client.commit(thirdCommitTime, jsc.parallelize(writeStatuses, 1));
+
+ // delete one base file and one log file to validate both cases are
handled gracefully
+ boolean deletedLogFile = false;
+ boolean deletedBaseFile = false;
+ for (WriteStatus writeStatus : writeStatuses) {
+ StoragePath path = FSUtils.constructAbsolutePath(basePath,
writeStatus.getStat().getPath());
+ if (deletedLogFile && deletedBaseFile) {
+ break;
+ }
+ if (FSUtils.isLogFile(path)) {
+ deletedLogFile = true;
+ storage.deleteFile(path);
+ } else {
+ deletedBaseFile = true;
+ storage.deleteFile(path);
+ }
+ }
+ client.restoreToSavepoint(secondCommitTime);
+ validateFilesMetadata(hoodieWriteConfig);
+ assertEquals(Collections.singletonMap(secondCommitTime, numRecords),
getRecordCountPerCommit());
+ }
+ }
+
+ private Map<String, Integer> writeInitialCommitsForAsyncServicesTests(int
numRecords) {
+ Map<String, Integer> commitToRowCount = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ String newCommitTime = writeClient.startCommit();
+ List<HoodieRecord> records = i == 0 ?
dataGen.generateInserts(newCommitTime, numRecords) :
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<WriteStatus> writeStatus = i == 0 ?
writeClient.insert(jsc.parallelize(records, 1), newCommitTime) :
writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
+ writeClient.commit(newCommitTime, writeStatus);
+ if (i == 2) {
+ commitToRowCount.put(newCommitTime, numRecords);
+ }
+ }
+ return commitToRowCount;
+ }
+
+ private HoodieWriteConfig
getHoodieWriteConfigWithCompactionAndConcurrencyControl(HoodieTableVersion
tableVersion) {
+ HoodieWriteConfig config =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .withInlineCompaction(false)
+ .withScheduleInlineCompaction(false)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .withAutoUpgradeVersion(false)
+ .withWriteTableVersion(tableVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+
.withStreamingWriteEnabled(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT))
+ .build())
+
.withProps(Collections.singletonMap(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
"0"))
+ .build();
+ try {
+ initMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+ return config;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to initialize HoodieTableMetaClient",
e);
+ }
+ }
+
+ private void validateFilesMetadata(HoodieWriteConfig writeConfig) {
Review Comment:
this validation def gives us good confidence now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]