guanziyue commented on code in PR #9523:
URL: https://github.com/apache/hudi/pull/9523#discussion_r1305909875


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.appendedLogPaths(context, 
config.getFinalizeWriteParallelism()));
+
+      // remove valid log files
+      for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat hoodieWriteStat : 
partitionAndWriteStats.getValue()) {
+          logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+        }
+      }
+
+      // remaining are invalid log files, let's generate write stat for them
+      if (logFilesMarkerPath.size() > 0) {
+        context.setJobStatus(this.getClass().getSimpleName(), "generate 
writeStat for missing log files");
+        List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = 
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {

Review Comment:
   fixed by new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to