lokeshj1703 commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2085324043
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -284,48 +293,56 @@ public void testMergeOnReadRestoreCompactionCommit()
throws IOException {
assertEquals(1, secondPartitionCommit2LogFiles.size());
HoodieTable table = this.getHoodieTable(metaClient, cfg);
- //3. rollback the update to partition1 and partition2
- HoodieInstant rollBackInstant =
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ?
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
- HoodieTimeline.DELTA_COMMIT_ACTION, "002");
- BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
- new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
- cfg.shouldRollbackUsingMarkers(), true);
- mergeOnReadRollbackPlanActionExecutor.execute().get();
- MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
- context,
- cfg,
- table,
- "003",
- rollBackInstant,
- true,
- false);
- //3. assert the rollback stat
- Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
- assertEquals(2, rollbackMetadata.size());
- assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
-
- // rollback 001 as well. this time since its part of the restore, entire
file slice should be deleted and not just log files (for partition1 and
partition2)
- HoodieInstant rollBackInstant1 =
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ?
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
- HoodieTimeline.DELTA_COMMIT_ACTION, "001");
- BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor1 =
- new BaseRollbackPlanActionExecutor(context, cfg, table, "004",
rollBackInstant1, false,
- cfg.shouldRollbackUsingMarkers(), true);
- mergeOnReadRollbackPlanActionExecutor1.execute().get();
- MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor1 = new
MergeOnReadRollbackActionExecutor(
- context,
- cfg,
- table,
- "004",
- rollBackInstant1,
- true,
- false);
- mergeOnReadRollbackActionExecutor1.execute().getPartitionMetadata();
-
- //assert there are no valid file groups in both partition1 and partition2
- assertEquals(0,
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).count());
- assertEquals(0,
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).count());
- // and only 3rd partition should have valid file groups.
-
assertTrue(table.getFileSystemView().getAllFileGroups(DEFAULT_THIRD_PARTITION_PATH).count()
> 0);
+ // Start a client so that timeline server starts
+ client = getHoodieWriteClient(cfg);
+ // Sleep for 1 second to ensure the timeline server port is listening
+ Thread.sleep(1000);
Review Comment:
BaseRollbackPlanActionExecutor makes a call to timeline server. Added a
comment.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -110,52 +111,57 @@ public void testMergeOnReadRollbackActionExecutor(boolean
isUsingMarkers) throws
assertEquals(1, secondPartitionCommit2LogFiles.size());
HoodieTable table = this.getHoodieTable(metaClient, cfg);
- //2. rollback
- HoodieInstant rollBackInstant =
INSTANT_GENERATOR.createNewInstant(isUsingMarkers ?
HoodieInstant.State.INFLIGHT : HoodieInstant.State.COMPLETED,
- HoodieTimeline.DELTA_COMMIT_ACTION, "002");
- BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
- new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
- cfg.shouldRollbackUsingMarkers(), false);
- mergeOnReadRollbackPlanActionExecutor.execute().get();
- MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
- context,
- cfg,
- table,
- "003",
- rollBackInstant,
- true,
- false);
- //3. assert the rollback stat
- Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
- assertEquals(2, rollbackMetadata.size());
-
- for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry :
rollbackMetadata.entrySet()) {
- HoodieRollbackPartitionMetadata meta = entry.getValue();
- assertEquals(0, meta.getFailedDeleteFiles().size());
- assertEquals(1, meta.getSuccessDeleteFiles().size());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ // create client so that timeline server starts
+ // Wait for embedded timeline server port to be ready
+ Thread.sleep(1000);
Review Comment:
BaseRollbackPlanActionExecutor makes a call to timeline server. Added a
comment.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -112,41 +111,41 @@ public void testPartitionChanges(HoodieTableType
tableType, IndexType indexType)
String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, p1, 0,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch0);
- assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2),
commitTimeAtEpoch0).collect());
+ client.commit(commitTimeAtEpoch0,
client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0));
// 2nd batch: normal updates same partition
String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch5);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2),
commitTimeAtEpoch5).collect());
+ client.commit(commitTimeAtEpoch5,
client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5));
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 5);
// 3rd batch: update all from p1 to p2
String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
List<HoodieRecord> updatesAtEpoch6 = getUpdates(updatesAtEpoch5, p2, 6,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch6);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch6, 2),
commitTimeAtEpoch6).collect());
+ client.commit(commitTimeAtEpoch6,
client.upsert(jsc().parallelize(updatesAtEpoch6, 2), commitTimeAtEpoch6));
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 6);
// 4th batch: update all from p2 to p3
String commitTimeAtEpoch7 = getCommitTimeAtUTC(7);
List<HoodieRecord> updatesAtEpoch7 = getUpdates(updatesAtEpoch6, p3, 7,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch7);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch7, 2),
commitTimeAtEpoch7).collect());
+ client.commit(commitTimeAtEpoch7,
client.upsert(jsc().parallelize(updatesAtEpoch7, 2), commitTimeAtEpoch7));
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 7);
// 5th batch: late update all to p4; discarded
String commitTimeAtEpoch8 = getCommitTimeAtUTC(8);
List<HoodieRecord> updatesAtEpoch2 = getUpdates(insertsAtEpoch0, p4, 2,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch8);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch2, 2),
commitTimeAtEpoch8).collect());
Review Comment:
Addressed
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java:
##########
@@ -242,19 +243,16 @@ public void
testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
private void createCommitWithInserts(HoodieWriteConfig cfg,
SparkRDDWriteClient client,
String prevCommitTime, String
newCommitTime, int numRecords) throws Exception {
- // Finish first base commmit
- JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime,
prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
+ insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords,
SparkRDDWriteClient::bulkInsert,
false, false, numRecords, INSTANT_GENERATOR);
- assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
}
private void createCommitWithUpserts(HoodieWriteConfig cfg,
SparkRDDWriteClient client, String prevCommit,
String commitTimeBetweenPrevAndNew,
String newCommitTime, int numRecords)
throws Exception {
- JavaRDD<WriteStatus> result = updateBatch(cfg, client, newCommitTime,
prevCommit,
+ updateBatch(cfg, client, newCommitTime, prevCommit,
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000",
numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2, INSTANT_GENERATOR);
- client.commit(newCommitTime, result);
Review Comment:
updateBatch API used here eventually calls overloaded method with skipCommit
as false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]