nsivabalan commented on a change in pull request #4446:
URL: https://github.com/apache/hudi/pull/4446#discussion_r780553443



##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
##########
@@ -133,6 +142,8 @@ private void assignInserts(WorkloadProfile profile, 
HoodieEngineContext context)
 
     for (String partitionPath : partitionPaths) {
       WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+      WorkloadStat executionStat = 
executionProfile.getOrDefault(partitionPath, new WorkloadStat());
+      executionProfile.putIfAbsent(partitionPath, executionStat);

Review comment:
       should we not move this line to L202 or something. i.e. after updating 
executionStat.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
         .build();
-    if (!scanner.iterator().hasNext()) {
-      scanner.close();
-      return new ArrayList<>();
-    }
 
     Option<HoodieBaseFile> oldDataFileOpt =
         operation.getBaseFile(metaClient.getBasePath(), 
operation.getPartitionPath());
 
+    // Considering following scenario: if all log blocks in this fileSlice is 
rollback, it returns an empty scanner.
+    // But in this case, we need to give it a base file. Otherwise, it will 
lose base file in following fileSlice.
+    if (!scanner.iterator().hasNext()) {
+      if (!oldDataFileOpt.isPresent()) {
+        scanner.close();
+        return new ArrayList<>();
+      } else {
+        // TODO: we may directly rename original parquet file if there is not 
evolution/devolution of schema

Review comment:
       do we know if scanner.getRecords() will return an empty map and not null 
when scanner.iterator().hasNext() is false. For the fix we have put up, we 
might end up calling 
   ```
   compactionHandler.handleUpdate(instantTime, operation.getPartitionPath(),
             operation.getFileId(), scanner.getRecords(),
             oldDataFileOpt.get());
   ```
   at L 211. 
   when all log files are to be rolledback and only have base file to be 
compacted. 
   

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -169,6 +178,8 @@ private void assignInserts(WorkloadProfile profile, 
HoodieEngineContext context)
 
     for (String partitionPath : partitionPaths) {
       WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+      WorkloadStat executionStat = 
executionProfile.getOrDefault(partitionPath, new WorkloadStat());
+      executionProfile.putIfAbsent(partitionPath, executionStat);

Review comment:
       should we move this to L244 or something. i.e. after updating 
executionStat and then update executionProfile map.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
##########
@@ -140,6 +158,138 @@ public void testMergeOnReadRollbackActionExecutor(boolean 
isUsingMarkers) throws
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
   }
 
+  @Test
+  public void testRollbackForCanIndexLogFile() throws IOException {
+    cleanupResources();
+    setUpDFS();
+    //1. prepare data and assert data result
+    //just generate one partitions
+    dataGen = new HoodieTestDataGenerator(new 
String[]{DEFAULT_FIRST_PARTITION_PATH});
+    HoodieWriteConfig cfg = 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024).build())
+        
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 
1024).parquetMaxFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if 
problem connecting to timeline-server
+            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build();
+
+    //1. prepare data
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new 
String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    /**
+     * Write 1 (only inserts)
+     */
+    String newCommitTime = "001";
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(newCommitTime, 2, 
DEFAULT_FIRST_PARTITION_PATH);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
+    
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(statuses.collect());
+    client.commit(newCommitTime, statuses);
+
+    // check fileSlice
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    SyncableFileSystemView fsView = 
getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
+    List<HoodieFileGroup> firstPartitionCommit2FileGroups = 
fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionCommit2FileGroups.size());
+    assertEquals(1, (int) 
firstPartitionCommit2FileGroups.get(0).getAllFileSlices().count());
+    
assertFalse(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getBaseFile().isPresent());
+    assertEquals(1, 
firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getLogFiles().count());
+    String generatedFileID = 
firstPartitionCommit2FileGroups.get(0).getFileGroupId().getFileId();
+
+    // check hoodieCommitMeta
+    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+        table.getMetaClient().getCommitTimeline()
+            .getInstantDetails(new HoodieInstant(true, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001"))
+            .get(),
+        HoodieCommitMetadata.class);
+    List<HoodieWriteStat> firstPartitionWriteStat = 
commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
+    assertEquals(2, firstPartitionWriteStat.size());
+    // we have an empty writeStat for all partition
+    assert firstPartitionWriteStat.stream().anyMatch(wStat -> 
StringUtils.isNullOrEmpty(wStat.getFileId()));
+    // we have one  non-empty writeStat which must contains update or insert
+    assertEquals(1, firstPartitionWriteStat.stream().filter(wStat -> 
!StringUtils.isNullOrEmpty(wStat.getFileId())).count());
+    firstPartitionWriteStat.stream().filter(wStat -> 
!StringUtils.isNullOrEmpty(wStat.getFileId())).forEach(wStat -> {
+      assert wStat.getNumInserts() > 0;
+    });
+
+    /**
+     * Write 2 (inserts)
+     */
+    newCommitTime = "002";
+    client.startCommitWithTime(newCommitTime);
+    List<HoodieRecord> updateRecords = 
Collections.singletonList(dataGen.generateUpdateRecord(records.get(0).getKey(), 
newCommitTime));
+    List<HoodieRecord> insertRecordsInSamePartition = 
dataGen.generateInsertsForPartition(newCommitTime, 2, 
DEFAULT_FIRST_PARTITION_PATH);
+    List<HoodieRecord> insertRecordsInOtherPartition = 
dataGen.generateInsertsForPartition(newCommitTime, 2, 
DEFAULT_SECOND_PARTITION_PATH);
+    List<HoodieRecord> recordsToBeWrite = 
Stream.concat(Stream.concat(updateRecords.stream(), 
insertRecordsInSamePartition.stream()), insertRecordsInOtherPartition.stream())
+        .collect(Collectors.toList());
+    writeRecords = jsc.parallelize(recordsToBeWrite, 1);
+    statuses = client.upsert(writeRecords, newCommitTime);
+    client.commit(newCommitTime, statuses);
+    table = this.getHoodieTable(metaClient, cfg);
+    commitMetadata = HoodieCommitMetadata.fromBytes(
+        table.getMetaClient().getCommitTimeline()
+            .getInstantDetails(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime))
+            .get(),
+        HoodieCommitMetadata.class);
+    assert 
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH);
+    assert 
commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH);
+    List<HoodieWriteStat> hoodieWriteStatOptionList = 
commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
+    // Both update and insert record should enter same existing fileGroup due 
to small file handling
+    assertEquals(1, hoodieWriteStatOptionList.size());
+    assertEquals(generatedFileID, 
hoodieWriteStatOptionList.get(0).getFileId());
+    // check insert and update numbers
+    assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
+    assertEquals(1, hoodieWriteStatOptionList.get(0).getNumUpdateWrites());
+
+    List<HoodieWriteStat> secondHoodieWriteStatOptionList = 
commitMetadata.getPartitionToWriteStats().get(DEFAULT_SECOND_PARTITION_PATH);
+    // All insert should enter one fileGroup
+    assertEquals(1, secondHoodieWriteStatOptionList.size());
+    String fileIdInPartitionTwo = 
secondHoodieWriteStatOptionList.get(0).getFileId();
+    assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
+
+    /**
+     * Rollback
+     */
+    HoodieInstant rollBackInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
"002");
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
+            cfg.shouldRollbackUsingMarkers());
+    mergeOnReadRollbackPlanActionExecutor.execute().get();
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
+        context,
+        cfg,
+        table,
+        "003",
+        rollBackInstant,
+        true,
+        false);
+
+    //3. assert the rollback stat
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
+    assertEquals(2, rollbackMetadata.size());
+
+    //4. assert filegroup after rollback, and compare to the rollbackstat
+    // assert the first partition data and log file size
+    HoodieRollbackPartitionMetadata partitionMetadata = 
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+    assertTrue(partitionMetadata.getSuccessDeleteFiles().isEmpty());
+    assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
+    assertEquals(1, partitionMetadata.getRollbackLogFiles().size());
+    assertEquals(1, partitionMetadata.getWrittenLogFiles().size());
+
+    // assert the second partition data and log file size
+    partitionMetadata = rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH);
+    assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
+    assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
+    assertTrue(partitionMetadata.getRollbackLogFiles().isEmpty());
+    assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());

Review comment:
       Can you add a read query at the end to validate that hudi returns only 
all valid records after this rollback. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -154,19 +154,21 @@ private void initKeyGenIfNeeded(boolean 
populateMetaFields) {
       LOG.info("RDD PreppedRecords was persisted at: " + 
inputRecordsRDD.getStorageLevel());
     }
 
-    WorkloadProfile profile = null;
+    WorkloadProfile inputProfile = null;
     if (isWorkloadProfileNeeded()) {
       context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile");
-      profile = new WorkloadProfile(buildProfile(inputRecordsRDD), 
operationType);
-      LOG.info("Workload profile :" + profile);
-      saveWorkloadProfileMetadataToInflight(profile, instantTime);
+      inputProfile = new WorkloadProfile(buildProfile(inputRecordsRDD), 
operationType);
+      LOG.info("Input workload profile :" + inputProfile);
     }
 
     // handle records update with clustering
     JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = 
clusteringHandleUpdate(inputRecordsRDD);
 
     // partition using the insert partitioner
-    final Partitioner partitioner = getPartitioner(profile);
+    final Partitioner partitioner = getPartitioner(inputProfile);
+    WorkloadProfile executionProfile = 
((org.apache.hudi.table.action.commit.Partitioner)partitioner).getExecutionWorkloadProfile();

Review comment:
       I understand isWorkloadProfileNeeded() is always true for all engines, 
but just to retain old behavior, can we place these two lines within the if 
block.
   ```
   if (isWorkloadProfileNeeded()) {
     WorkloadProfile executionProfile = 
((org.apache.hudi.table.action.commit.Partitioner)partitioner).getExecutionWorkloadProfile();
       saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);
   }
   ```

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
         .build();
-    if (!scanner.iterator().hasNext()) {
-      scanner.close();
-      return new ArrayList<>();
-    }
 
     Option<HoodieBaseFile> oldDataFileOpt =
         operation.getBaseFile(metaClient.getBasePath(), 
operation.getPartitionPath());
 
+    // Considering following scenario: if all log blocks in this fileSlice is 
rollback, it returns an empty scanner.
+    // But in this case, we need to give it a base file. Otherwise, it will 
lose base file in following fileSlice.
+    if (!scanner.iterator().hasNext()) {
+      if (!oldDataFileOpt.isPresent()) {
+        scanner.close();
+        return new ArrayList<>();
+      } else {
+        // TODO: we may directly rename original parquet file if there is not 
evolution/devolution of schema

Review comment:
       do these need to be cleaned up ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to