nsivabalan commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1309184030
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +129,162 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by failed spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about the file slice of
interest when we want to add a new WriteStat.
+ List<Pair<String, Map<String, HoodieWriteStat>>>
partitionToFileIdAndWriteStatList = new ArrayList<>();
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ String partition = partitionAndWriteStats.getKey();
+ Map<String, HoodieWriteStat> fileIdToWriteStat = new HashMap<>();
+ partitionAndWriteStats.getValue().forEach(writeStat -> {
+ String fileId = writeStat.getFileId();
+ if (!fileIdToWriteStat.containsKey(fileId)) {
+ fileIdToWriteStat.put(fileId, writeStat);
+ }
+ });
+ partitionToFileIdAndWriteStatList.add(Pair.of(partition,
fileIdToWriteStat));
+ }
+
+ final Path basePath = new Path(config.getBasePath());
+ Map<String, Map<String, List<String>>>
partitionToFileIdAndMissingLogFiles = new HashMap<>();
+ logFilesMarkerPath
+ .stream()
+ .forEach(logFilePathStr -> {
+ Path logFileFullPath = new Path(config.getBasePath(),
logFilePathStr);
+ String fileID = FSUtils.getFileId(logFileFullPath.getName());
+ String partitionPath =
FSUtils.getRelativePartitionPath(basePath, logFileFullPath.getParent());
+ if
(!partitionToFileIdAndMissingLogFiles.containsKey(partitionPath)) {
+ partitionToFileIdAndMissingLogFiles.put(partitionPath, new
HashMap<>());
+ }
+ if
(!partitionToFileIdAndMissingLogFiles.get(partitionPath).containsKey(fileID)) {
+
partitionToFileIdAndMissingLogFiles.get(partitionPath).put(fileID, new
ArrayList<>());
+ }
+
partitionToFileIdAndMissingLogFiles.get(partitionPath).get(fileID).add(logFilePathStr);
+ });
+
+ context.setJobStatus(this.getClass().getSimpleName(), "generate
writeStat for missing log files");
+
+ // populate partition -> map (fileId -> List <missing log file>)
+ List<Map.Entry<String, Map<String, List<String>>>> missingFilesInfo =
partitionToFileIdAndMissingLogFiles.entrySet().stream().collect(Collectors.toList());
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
context.parallelize(missingFilesInfo).mapToPair(
+ (SerializablePairFunction<Map.Entry<String, Map<String,
List<String>>>, String, Map<String, List<String>>>) t -> Pair.of(t.getKey(),
t.getValue()));
+
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileId to fetch some info about the file slice of
interest.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData =
context.parallelize(partitionToFileIdAndWriteStatList).mapToPair(
+ (SerializablePairFunction<Pair<String, Map<String,
HoodieWriteStat>>, String, Map<String, HoodieWriteStat>>) t -> t);
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+
+ // lets do left outer join to add write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(config.getBasePath());
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFilesList =
v1.getValue().getValue();
+
+ List<HoodieWriteStat> missingWriteStats = new ArrayList();
+ List<String> missingLogFilesForPartition = new ArrayList();
+ missingFileIdToLogFilesList.values().forEach(entry ->
missingLogFilesForPartition.addAll(entry));
+
+ // fetch file sizes for missing log files
+ Path fullPartitionPath = new Path(config.getBasePath(),
partitionPath);
+ FileSystem fileSystem =
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatues =
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath,
missingLogFilesForPartition, true);
Review Comment:
nope. there are some minor differences.
for rollback planning I agree anyways, we will need the file Status.
but during reconcile of regular delta commit, if every marker file returned
by markers are present in WriteStatus, we will trigger any additional FS calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]