nsivabalan commented on a change in pull request #4446:
URL: https://github.com/apache/hudi/pull/4446#discussion_r780553443
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
##########
@@ -133,6 +142,8 @@ private void assignInserts(WorkloadProfile profile,
HoodieEngineContext context)
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+ WorkloadStat executionStat =
executionProfile.getOrDefault(partitionPath, new WorkloadStat());
+ executionProfile.putIfAbsent(partitionPath, executionStat);
Review comment:
should we not move this line to L202 or something. i.e. after updating
executionStat.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
- if (!scanner.iterator().hasNext()) {
- scanner.close();
- return new ArrayList<>();
- }
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(),
operation.getPartitionPath());
+ // Considering following scenario: if all log blocks in this fileSlice is
rollback, it returns an empty scanner.
+ // But in this case, we need to give it a base file. Otherwise, it will
lose base file in following fileSlice.
+ if (!scanner.iterator().hasNext()) {
+ if (!oldDataFileOpt.isPresent()) {
+ scanner.close();
+ return new ArrayList<>();
+ } else {
+ // TODO: we may directly rename original parquet file if there is not
evolution/devolution of schema
Review comment:
do we know if scanner.getRecords() will return an empty map and not null
when scanner.iterator().hasNext() is false. For the fix we have put up, we
might end up calling
```
compactionHandler.handleUpdate(instantTime, operation.getPartitionPath(),
operation.getFileId(), scanner.getRecords(),
oldDataFileOpt.get());
```
at L 211.
when all log files are to be rolledback and only have base file to be
compacted.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -169,6 +178,8 @@ private void assignInserts(WorkloadProfile profile,
HoodieEngineContext context)
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+ WorkloadStat executionStat =
executionProfile.getOrDefault(partitionPath, new WorkloadStat());
+ executionProfile.putIfAbsent(partitionPath, executionStat);
Review comment:
should we move this to L244 or something. i.e. after updating
executionStat and then update executionProfile map.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
##########
@@ -140,6 +158,138 @@ public void testMergeOnReadRollbackActionExecutor(boolean
isUsingMarkers) throws
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
}
+ @Test
+ public void testRollbackForCanIndexLogFile() throws IOException {
+ cleanupResources();
+ setUpDFS();
+ //1. prepare data and assert data result
+ //just generate one partitions
+ dataGen = new HoodieTestDataGenerator(new
String[]{DEFAULT_FIRST_PARTITION_PATH});
+ HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2,
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .withWriteStatusClass(MetadataMergeWriteStatus.class)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024).build())
+
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 *
1024).parquetMaxFileSize(1024 * 1024).build())
+ .forTable("test-trip-table")
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
+
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build();
+
+ //1. prepare data
+ HoodieTestDataGenerator.writePartitionMetadata(fs, new
String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ /**
+ * Write 1 (only inserts)
+ */
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(newCommitTime, 2,
DEFAULT_FIRST_PARTITION_PATH);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(statuses.collect());
+ client.commit(newCommitTime, statuses);
+
+ // check fileSlice
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ SyncableFileSystemView fsView =
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+ List<HoodieFileGroup> firstPartitionCommit2FileGroups =
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, firstPartitionCommit2FileGroups.size());
+ assertEquals(1, (int)
firstPartitionCommit2FileGroups.get(0).getAllFileSlices().count());
+
assertFalse(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getBaseFile().isPresent());
+ assertEquals(1,
firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getLogFiles().count());
+ String generatedFileID =
firstPartitionCommit2FileGroups.get(0).getFileGroupId().getFileId();
+
+ // check hoodieCommitMeta
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+ table.getMetaClient().getCommitTimeline()
+ .getInstantDetails(new HoodieInstant(true,
HoodieTimeline.DELTA_COMMIT_ACTION, "001"))
+ .get(),
+ HoodieCommitMetadata.class);
+ List<HoodieWriteStat> firstPartitionWriteStat =
commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
+ assertEquals(2, firstPartitionWriteStat.size());
+ // we have an empty writeStat for all partition
+ assert firstPartitionWriteStat.stream().anyMatch(wStat ->
StringUtils.isNullOrEmpty(wStat.getFileId()));
+ // we have one non-empty writeStat which must contains update or insert
+ assertEquals(1, firstPartitionWriteStat.stream().filter(wStat ->
!StringUtils.isNullOrEmpty(wStat.getFileId())).count());
+ firstPartitionWriteStat.stream().filter(wStat ->
!StringUtils.isNullOrEmpty(wStat.getFileId())).forEach(wStat -> {
+ assert wStat.getNumInserts() > 0;
+ });
+
+ /**
+ * Write 2 (inserts)
+ */
+ newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> updateRecords =
Collections.singletonList(dataGen.generateUpdateRecord(records.get(0).getKey(),
newCommitTime));
+ List<HoodieRecord> insertRecordsInSamePartition =
dataGen.generateInsertsForPartition(newCommitTime, 2,
DEFAULT_FIRST_PARTITION_PATH);
+ List<HoodieRecord> insertRecordsInOtherPartition =
dataGen.generateInsertsForPartition(newCommitTime, 2,
DEFAULT_SECOND_PARTITION_PATH);
+ List<HoodieRecord> recordsToBeWrite =
Stream.concat(Stream.concat(updateRecords.stream(),
insertRecordsInSamePartition.stream()), insertRecordsInOtherPartition.stream())
+ .collect(Collectors.toList());
+ writeRecords = jsc.parallelize(recordsToBeWrite, 1);
+ statuses = client.upsert(writeRecords, newCommitTime);
+ client.commit(newCommitTime, statuses);
+ table = this.getHoodieTable(metaClient, cfg);
+ commitMetadata = HoodieCommitMetadata.fromBytes(
+ table.getMetaClient().getCommitTimeline()
+ .getInstantDetails(new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime))
+ .get(),
+ HoodieCommitMetadata.class);
+ assert
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH);
+ assert
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH);
+ List<HoodieWriteStat> hoodieWriteStatOptionList =
commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
+ // Both update and insert record should enter same existing fileGroup due
to small file handling
+ assertEquals(1, hoodieWriteStatOptionList.size());
+ assertEquals(generatedFileID,
hoodieWriteStatOptionList.get(0).getFileId());
+ // check insert and update numbers
+ assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
+ assertEquals(1, hoodieWriteStatOptionList.get(0).getNumUpdateWrites());
+
+ List<HoodieWriteStat> secondHoodieWriteStatOptionList =
commitMetadata.getPartitionToWriteStats().get(DEFAULT_SECOND_PARTITION_PATH);
+ // All insert should enter one fileGroup
+ assertEquals(1, secondHoodieWriteStatOptionList.size());
+ String fileIdInPartitionTwo =
secondHoodieWriteStatOptionList.get(0).getFileId();
+ assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
+
+ /**
+ * Rollback
+ */
+ HoodieInstant rollBackInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"002");
+ BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+ new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
+ cfg.shouldRollbackUsingMarkers());
+ mergeOnReadRollbackPlanActionExecutor.execute().get();
+ MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
+ context,
+ cfg,
+ table,
+ "003",
+ rollBackInstant,
+ true,
+ false);
+
+ //3. assert the rollback stat
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
+ assertEquals(2, rollbackMetadata.size());
+
+ //4. assert filegroup after rollback, and compare to the rollbackstat
+ // assert the first partition data and log file size
+ HoodieRollbackPartitionMetadata partitionMetadata =
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+ assertTrue(partitionMetadata.getSuccessDeleteFiles().isEmpty());
+ assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
+ assertEquals(1, partitionMetadata.getRollbackLogFiles().size());
+ assertEquals(1, partitionMetadata.getWrittenLogFiles().size());
+
+ // assert the second partition data and log file size
+ partitionMetadata = rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH);
+ assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
+ assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
+ assertTrue(partitionMetadata.getRollbackLogFiles().isEmpty());
+ assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
Review comment:
Can you add a read query at the end to validate that hudi returns only
all valid records after this rollback.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -154,19 +154,21 @@ private void initKeyGenIfNeeded(boolean
populateMetaFields) {
LOG.info("RDD PreppedRecords was persisted at: " +
inputRecordsRDD.getStorageLevel());
}
- WorkloadProfile profile = null;
+ WorkloadProfile inputProfile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload
profile");
- profile = new WorkloadProfile(buildProfile(inputRecordsRDD),
operationType);
- LOG.info("Workload profile :" + profile);
- saveWorkloadProfileMetadataToInflight(profile, instantTime);
+ inputProfile = new WorkloadProfile(buildProfile(inputRecordsRDD),
operationType);
+ LOG.info("Input workload profile :" + inputProfile);
}
// handle records update with clustering
JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate =
clusteringHandleUpdate(inputRecordsRDD);
// partition using the insert partitioner
- final Partitioner partitioner = getPartitioner(profile);
+ final Partitioner partitioner = getPartitioner(inputProfile);
+ WorkloadProfile executionProfile =
((org.apache.hudi.table.action.commit.Partitioner)partitioner).getExecutionWorkloadProfile();
Review comment:
I understand isWorkloadProfileNeeded() is always true for all engines,
but just to retain old behavior, can we place these two lines within the if
block.
```
if (isWorkloadProfileNeeded()) {
WorkloadProfile executionProfile =
((org.apache.hudi.table.action.commit.Partitioner)partitioner).getExecutionWorkloadProfile();
saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);
}
```
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
- if (!scanner.iterator().hasNext()) {
- scanner.close();
- return new ArrayList<>();
- }
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(),
operation.getPartitionPath());
+ // Considering following scenario: if all log blocks in this fileSlice is
rollback, it returns an empty scanner.
+ // But in this case, we need to give it a base file. Otherwise, it will
lose base file in following fileSlice.
+ if (!scanner.iterator().hasNext()) {
+ if (!oldDataFileOpt.isPresent()) {
+ scanner.close();
+ return new ArrayList<>();
+ } else {
+ // TODO: we may directly rename original parquet file if there is not
evolution/devolution of schema
Review comment:
do these need to be cleaned up ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]