codope commented on code in PR #9553:
URL: https://github.com/apache/hudi/pull/9553#discussion_r1314253600
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -313,6 +313,47 @@ public static Map<String, FileStatus[]>
getFilesInPartitions(HoodieEngineContext
}
}
+ public static List<Option<FileStatus>>
getFileStatusesUnderPartition(FileSystem fileSystem,
+ Path
partitionPathIncludeBasePath,
+
List<String> filesNamesUnderThisPartition,
+ boolean
ignoreMissingFiles) {
+ String fileSystemType = fileSystem.getScheme();
+ boolean useListStatus =
StorageSchemes.isListStatusFriendly(fileSystemType);
+ List<Option<FileStatus>> result = new
ArrayList<>(filesNamesUnderThisPartition.size());
+ try {
+ if (useListStatus) {
+ FileStatus[] fileStatuses =
fileSystem.listStatus(partitionPathIncludeBasePath,
+ path -> filesNamesUnderThisPartition.contains(path.getName()));
Review Comment:
how about having `filesNamesUnderThisPartition` as Set? wouldn't matter for
small lists but more efficient to do lookups with set.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
+ context.setJobStatus(classNameForContext, "Generating writeStat for
missing log files");
+
+ // lets join both to generate write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(localBasePath);
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFileNames =
v1.getValue().getValue();
+ List<String> missingLogFileNames = new ArrayList<>();
+ missingFileIdToLogFileNames.values().forEach(logfiles ->
missingLogFileNames.addAll(logfiles));
Review Comment:
Can simplify using flatMap
```
List<String> missingLogFileNames =
missingFileIdToLogFileNames.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -162,24 +199,126 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
1L
);
+ // With listing based rollback, sometimes we only get the fileID of
interest(so that we can add rollback command block) w/o the actual file name.
+ // So, we want to ignore such invalid files from this list before we
add it to the rollback stats.
+ String partitionFullPath = metaClient.getBasePathV2().toString();
+ Map<String, Long> validLogBlocksToDelete = new HashMap<>();
+
rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach((kv) -> {
+ String logFileFullPath = kv.getKey();
+ String logFileName = logFileFullPath.replace(partitionFullPath, "");
+ if (!StringUtils.isNullOrEmpty(logFileName)) {
+ validLogBlocksToDelete.put(kv.getKey(), kv.getValue());
+ }
+ });
+
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withLogFilesFromFailedCommit(validLogBlocksToDelete)
+ .build()))
.stream();
} else {
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+ .build()))
.stream();
}
}, numPartitions);
}
+ private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final
WriteMarkers writeMarkers, String partitionPath, String fileId) {
+ return new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ // there may be existed marker file if fs support append. So always
return true;
+ createAppendMarker(logFileToAppend);
+ return true;
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ return writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND,
+ config, fileId, metaClient.getActiveTimeline()).isPresent();
+ }
+ };
+ }
+
+ /**
+ * If there are log files created by previous rollback attempts, we want to
add them to rollback stats so that MDT is able to track them.
+ * @param context
+ * @param originalRollbackStats
+ * @param logPaths
+ * @return
+ */
+ private List<HoodieRollbackStat>
addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context,
Review Comment:
consider splitting the method into smaller methods to make the code much
more readable and maintainable.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
+ context.setJobStatus(classNameForContext, "Generating writeStat for
missing log files");
+
+ // lets join both to generate write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(localBasePath);
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFileNames =
v1.getValue().getValue();
+ List<String> missingLogFileNames = new ArrayList<>();
+ missingFileIdToLogFileNames.values().forEach(logfiles ->
missingLogFileNames.addAll(logfiles));
+
+ // fetch file sizes from FileSystem
+ Path fullPartitionPath =
StringUtils.isNullOrEmpty(partitionPath) ? new Path(localBasePath) : new
Path(localBasePath, partitionPath);
+ FileSystem fileSystem =
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatuesOpt =
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath,
missingLogFileNames, true);
+ List<FileStatus> fileStatuses =
fileStatuesOpt.stream().filter(fileStatusOpt ->
fileStatusOpt.isPresent()).map(fileStatusOption ->
fileStatusOption.get()).collect(Collectors.toList());
+
+ // populate fileId -> List<FileStatus>
+ Map<String, List<FileStatus>> missingFileIdToLogFilesList = new
HashMap<>();
+ fileStatuses.forEach(fileStatus -> {
+ String fileId =
FSUtils.getFileIdFromLogPath(fileStatus.getPath());
+ if (!missingFileIdToLogFilesList.containsKey(fileId)) {
+ missingFileIdToLogFilesList.put(fileId, new ArrayList<>());
+ }
+ missingFileIdToLogFilesList.get(fileId).add(fileStatus);
+ });
+
+ List<HoodieWriteStat> missingWriteStats = new ArrayList();
+ missingFileIdToLogFilesList.forEach((k, logFileStatuses) -> {
+ String fileId = k;
+ HoodieDeltaWriteStat originalWriteStat =
+ (HoodieDeltaWriteStat)
fileIdToOriginalWriteStat.get(fileId); // are there chances that there won't be
any write stat in original list?
+ logFileStatuses.forEach(fileStatus -> {
+ // for every missing file, add a new HoodieDeltaWriteStat
+ HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
+ HoodieLogFile logFile = new HoodieLogFile(fileStatus);
+ writeStat.setPath(basePathLocal, logFile.getPath());
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setFileId(fileId);
+ writeStat.setTotalWriteBytes(logFile.getFileSize());
+ writeStat.setFileSizeInBytes(logFile.getFileSize());
+ writeStat.setLogVersion(logFile.getLogVersion());
+ List<String> logFiles = new
ArrayList<>(originalWriteStat.getLogFiles());
+ logFiles.add(logFile.getFileName());
+ writeStat.setLogFiles(logFiles);
+ writeStat.setBaseFile(originalWriteStat.getBaseFile());
+ writeStat.setPrevCommit(logFile.getBaseCommitTime());
+ missingWriteStats.add(writeStat);
+ });
+ });
+ return Pair.of(partitionPath, missingWriteStats);
+ }).collectAsList();
+
+ // add these write stat to commit meta. deltaWriteStat can be empty
due to file missing. See code above to address FileNotFoundException
+ for (Pair<String, List<HoodieWriteStat>> partitionDeltaStats :
additionalLogFileWriteStat) {
+ String partitionPath = partitionDeltaStats.getKey();
+ partitionDeltaStats.getValue().forEach(ws ->
commitMetadata.addWriteStat(partitionPath, ws));
+ }
+ }
+ }
+ return commitMetadata;
+ }
+
+ private static HoodiePairData<String, Map<String, HoodieWriteStat>>
getPartitionToFileIdToFilesMap(HoodieCommitMetadata commitMetadata,
HoodieEngineContext context) {
+
+ List<Map.Entry<String, List<HoodieWriteStat>>> partitionToWriteStats = new
ArrayList(commitMetadata.getPartitionToWriteStats().entrySet());
+
+ return context.parallelize(partitionToWriteStats)
+ .mapToPair((SerializablePairFunction<Map.Entry<String,
List<HoodieWriteStat>>, String, Map<String, HoodieWriteStat>>) t -> {
+ Map<String, HoodieWriteStat> fileIdToWriteStat = new HashMap<>();
+ t.getValue().forEach(writeStat -> {
+ if (!fileIdToWriteStat.containsKey(writeStat.getFileId())) {
+ fileIdToWriteStat.put(writeStat.getFileId(), writeStat);
+ }
+ });
+ return Pair.of(t.getKey(), fileIdToWriteStat);
+ });
+ }
+
+ private static HoodiePairData<String, Map<String, List<String>>>
getPartitionToFileIdToMissingLogFileMap(String basePathStr, Set<String>
logFilesMarkerPath, HoodieEngineContext context,
Review Comment:
If i understand correctly, this method aims to convert a list of log file
paths into a mapping between partition paths and a second mapping between file
IDs and the corresponding missing log files. The method looks fine, but i guess
there are some opportunities for optimization. Instead of first converting
paths to pairs of partition paths and log file names, and then reducing them,
you can directly use `groupByKey` to group all log file paths by partition. And
then the conversion from log file paths to file IDs and their associated log
files can be a part of the map function post grouping.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -313,6 +313,47 @@ public static Map<String, FileStatus[]>
getFilesInPartitions(HoodieEngineContext
}
}
+ public static List<Option<FileStatus>>
getFileStatusesUnderPartition(FileSystem fileSystem,
+ Path
partitionPathIncludeBasePath,
+
List<String> filesNamesUnderThisPartition,
+ boolean
ignoreMissingFiles) {
+ String fileSystemType = fileSystem.getScheme();
+ boolean useListStatus =
StorageSchemes.isListStatusFriendly(fileSystemType);
+ List<Option<FileStatus>> result = new
ArrayList<>(filesNamesUnderThisPartition.size());
+ try {
+ if (useListStatus) {
+ FileStatus[] fileStatuses =
fileSystem.listStatus(partitionPathIncludeBasePath,
+ path -> filesNamesUnderThisPartition.contains(path.getName()));
+ for (String fileName : filesNamesUnderThisPartition) {
+ Option<FileStatus> ele = Option.fromJavaOptional(
+ Arrays.stream(fileStatuses)
+ .filter(fileStatus ->
fileStatus.getPath().getName().equals(fileName))
Review Comment:
Something as below:
```
if (useListStatus) {
FileStatus[] fileStatuses =
fileSystem.listStatus(partitionPathIncludeBasePath,
path -> filesNamesUnderThisPartition.contains(path.getName()));
Map<String, FileStatus> filenameToFileStatusMap =
Arrays.stream(fileStatuses)
.collect(Collectors.toMap(
fileStatus -> fileStatus.getPath().getName(),
fileStatus -> fileStatus
));
for (String fileName : filesNamesUnderThisPartition) {
if (filenameToFileStatusMap.containsKey(fileName)) {
result.add(Option.of(filenameToFileStatusMap.get(fileName)));
} else {
if (!ignoreMissingFiles) {
throw new FileNotFoundException("File not found: " + new
Path(partitionPathIncludeBasePath.toString(), fileName));
}
result.add(Option.empty());
}
}
}
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -79,14 +93,28 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Hoo
// stack trace:
https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
// related stack overflow post:
https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as
GenericData.Array.
List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
- return context.reduceByKey(maybeDeleteAndCollectStats(context,
instantToRollback, serializableRequests, true, parallelism),
- RollbackUtils::mergeRollbackStat, parallelism);
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+
+ // Considering rollback may failed before, which generated some additional
log files. We need to add these log files back.
+ Set<String> logPaths = new HashSet<>();
+ try {
+ logPaths = markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism());
Review Comment:
got it
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -162,24 +199,126 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
1L
);
+ // With listing based rollback, sometimes we only get the fileID of
interest(so that we can add rollback command block) w/o the actual file name.
+ // So, we want to ignore such invalid files from this list before we
add it to the rollback stats.
+ String partitionFullPath = metaClient.getBasePathV2().toString();
+ Map<String, Long> validLogBlocksToDelete = new HashMap<>();
+
rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach((kv) -> {
+ String logFileFullPath = kv.getKey();
+ String logFileName = logFileFullPath.replace(partitionFullPath, "");
+ if (!StringUtils.isNullOrEmpty(logFileName)) {
+ validLogBlocksToDelete.put(kv.getKey(), kv.getValue());
+ }
+ });
+
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withLogFilesFromFailedCommit(validLogBlocksToDelete)
+ .build()))
.stream();
} else {
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+ .build()))
.stream();
}
}, numPartitions);
}
+ private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final
WriteMarkers writeMarkers, String partitionPath, String fileId) {
+ return new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ // there may be existed marker file if fs support append. So always
return true;
+ createAppendMarker(logFileToAppend);
+ return true;
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ return writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND,
+ config, fileId, metaClient.getActiveTimeline()).isPresent();
+ }
+ };
+ }
+
+ /**
+ * If there are log files created by previous rollback attempts, we want to
add them to rollback stats so that MDT is able to track them.
+ * @param context
+ * @param originalRollbackStats
+ * @param logPaths
+ * @return
+ */
+ private List<HoodieRollbackStat>
addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context,
+
List<HoodieRollbackStat> originalRollbackStats,
+
Set<String> logPaths) {
+ if (logPaths.isEmpty()) {
+ // if rollback is not failed and re-attempted, we should not find any
additional log files here.
+ return originalRollbackStats;
+ }
+
+ final String basePathStr = config.getBasePath();
+ List<String> logFiles = new ArrayList<>(logPaths);
+ // populate partitionPath -> List<log file name>
+ HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData =
context.parallelize(logFiles)
+ // lets map each log file to partition path and log file name
+ .mapToPair((SerializablePairFunction<String, String, String>) t -> {
+ Path logFilePath = new Path(basePathStr, t);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePathStr), logFilePath.getParent());
+ return Pair.of(partitionPath, logFilePath.getName());
+ })
+ // lets group by partition path and collect it as log file list per
partition path
+ .groupByKey().mapToPair((SerializablePairFunction<Pair<String,
Iterable<String>>, String, List<String>>) t -> {
+ List<String> allFiles = new ArrayList<>();
+ t.getRight().forEach(entry -> allFiles.add(entry));
+ return Pair.of(t.getKey(), allFiles);
+ });
+
+ // populate partitionPath -> HoodieRollbackStat
+ HoodiePairData<String, HoodieRollbackStat>
partitionPathToRollbackStatsHoodieData =
+ context.parallelize(originalRollbackStats)
+ .mapToPair((SerializablePairFunction<HoodieRollbackStat, String,
HoodieRollbackStat>) t -> Pair.of(t.getPartitionPath(), t));
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(table.getHadoopConf());
Review Comment:
Configuration isn't gonna change right. How about making the
`SerializableConfiguration` a class-level object so that it doesn’t need to be
recreated for each method call?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -844,6 +847,47 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
return Pair.of(records, records2);
}
+ /**
+ * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
+ *
+ * @param table instance of {@link HoodieTable}
+ */
+ private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
Review Comment:
+1 for adding this test.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -313,6 +313,47 @@ public static Map<String, FileStatus[]>
getFilesInPartitions(HoodieEngineContext
}
}
+ public static List<Option<FileStatus>>
getFileStatusesUnderPartition(FileSystem fileSystem,
+ Path
partitionPathIncludeBasePath,
+
List<String> filesNamesUnderThisPartition,
+ boolean
ignoreMissingFiles) {
+ String fileSystemType = fileSystem.getScheme();
+ boolean useListStatus =
StorageSchemes.isListStatusFriendly(fileSystemType);
+ List<Option<FileStatus>> result = new
ArrayList<>(filesNamesUnderThisPartition.size());
+ try {
+ if (useListStatus) {
+ FileStatus[] fileStatuses =
fileSystem.listStatus(partitionPathIncludeBasePath,
+ path -> filesNamesUnderThisPartition.contains(path.getName()));
+ for (String fileName : filesNamesUnderThisPartition) {
+ Option<FileStatus> ele = Option.fromJavaOptional(
+ Arrays.stream(fileStatuses)
+ .filter(fileStatus ->
fileStatus.getPath().getName().equals(fileName))
Review Comment:
Since you're alrady using a filter in line 326, once you list the statuses
with the given filename filter, there's no need to loop over the filenames
again and then filter the fetched statuses.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
+ context.setJobStatus(classNameForContext, "Generating writeStat for
missing log files");
+
+ // lets join both to generate write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
Review Comment:
Is it possible that inner join can ignore some missing log files? I don't
think on object storage with string read-after-write consistency this can
happen.
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -191,6 +191,30 @@ public <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K,
return new HoodieListPairData<>(leftOuterJoined, lazy);
}
+ @Override
+ public <W> HoodiePairData<K, Pair<V, W>> join(HoodiePairData<K, W> other) {
+ ValidationUtils.checkArgument(other instanceof HoodieListPairData);
+
+ // Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
+ HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
Review Comment:
Here, we're converting the right-side of the join (`other`) into a Stream,
and then using the collect method to aggregate this stream into a HashMap
(`rightStreamMap`). This map holds all keys and associated values of the right
side in memory. If the `other` dataset is large, this could lead to significant
memory usage. Maybe just the keys can be help in-memory for presence check.
Something like below:
```
public <W> HoodiePairData<K, Pair<V, W>> join(HoodiePairData<K, W> other) {
ValidationUtils.checkArgument(other instanceof HoodieListPairData);
// Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
Map<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
Collectors.groupingBy(
Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
List<Pair<K, Pair<V, W>>> joinResult = new ArrayList<>();
asStream().forEach(pair -> {
K key = pair.getKey();
V leftValue = pair.getValue();
List<W> rightValues = rightStreamMap.getOrDefault(key,
Collections.emptyList());
for (W rightValue : rightValues) {
joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue)));
}
});
return new HoodieListPairData<>(joinResult.stream(), lazy);
}
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
+ context.setJobStatus(classNameForContext, "Generating writeStat for
missing log files");
+
+ // lets join both to generate write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(localBasePath);
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFileNames =
v1.getValue().getValue();
+ List<String> missingLogFileNames = new ArrayList<>();
+ missingFileIdToLogFileNames.values().forEach(logfiles ->
missingLogFileNames.addAll(logfiles));
+
+ // fetch file sizes from FileSystem
+ Path fullPartitionPath =
StringUtils.isNullOrEmpty(partitionPath) ? new Path(localBasePath) : new
Path(localBasePath, partitionPath);
+ FileSystem fileSystem =
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatuesOpt =
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath,
missingLogFileNames, true);
+ List<FileStatus> fileStatuses =
fileStatuesOpt.stream().filter(fileStatusOpt ->
fileStatusOpt.isPresent()).map(fileStatusOption ->
fileStatusOption.get()).collect(Collectors.toList());
+
+ // populate fileId -> List<FileStatus>
+ Map<String, List<FileStatus>> missingFileIdToLogFilesList = new
HashMap<>();
+ fileStatuses.forEach(fileStatus -> {
+ String fileId =
FSUtils.getFileIdFromLogPath(fileStatus.getPath());
+ if (!missingFileIdToLogFilesList.containsKey(fileId)) {
+ missingFileIdToLogFilesList.put(fileId, new ArrayList<>());
+ }
+ missingFileIdToLogFilesList.get(fileId).add(fileStatus);
Review Comment:
can be simplified using putIfAbsent
```
missingFileIdToLogFilesList.putIfAbsent(fileId, new ArrayList<>());
missingFileIdToLogFilesList.get(fileId).add(fileStatus);
```
##########
hudi-common/src/main/avro/HoodieRollbackMetadata.avsc:
##########
@@ -38,7 +38,18 @@
"type": "long",
"doc": "Size of this file in bytes"
}
- }], "default":null }
+ }], "default":null },
+ {"name": "logFilesFromFailedCommit",
Review Comment:
Should we make the schema change in a separate commit? Just a suggestion, in
case the change needs to be reverted, schema can still go in.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
Review Comment:
Isn't this already happening in `FSUtils.getFileStatusesUnderPartition`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -203,11 +203,19 @@ protected Path getMarkerPath(String partitionPath, String
dataFileName, IOType t
/**
* @param context {@code HoodieEngineContext} instance.
* @param parallelism parallelism for reading the marker files in the
directory.
- * @return all the data file paths of write IO type "CREATE" and "MERGE"
+ * @return all the data file or log file paths of write IO type "CREATE" and
"MERGE"
* @throws IOException
*/
public abstract Set<String> createdAndMergedDataPaths(HoodieEngineContext
context, int parallelism) throws IOException;
+ /**
+ * @param context {@code HoodieEngineContext} instance.
+ * @param parallelism parallelism for reading the marker files in the
directory.
+ * @return all the log file paths of write IO type "APPEND"
+ * @throws IOException
+ */
+ public abstract Set<String> getAppendedLogPaths(HoodieEngineContext context,
int parallelism) throws IOException;
Review Comment:
A unit test for this method
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -75,64 +87,150 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(),
config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(),
config.getRollbackParallelism()), 1);
- return context.map(markerPaths, markerFilePath -> {
+ List<HoodieRollbackRequest> rollbackRequests = context.map(markerPaths,
markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String fileNameWithPartitionToRollback =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePathToRollback = new Path(basePath,
fileNameWithPartitionToRollback);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePathToRollback.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+ HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePathToRollback.toString());
+ fileId = baseFileToDelete.getFileId();
+ baseInstantTime = baseFileToDelete.getCommitTime();
+ } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+ // TODO: HUDI-1517 may distinguish log file created from log
file being appended in the future @guanziyue
+ // Now it should not have create type
+ checkArgument(type != IOType.CREATE, "Log file should not
support create io type now");
+ checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
Review Comment:
i'm not sure i follow the validation here.. these validations are under case
MERGE and CREATE. One of them will fail right?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
Review Comment:
if we can futher breakdown the method into smaller methods, would be great.
Also, please add unit tests for the util methods.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -162,24 +199,126 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
1L
);
+ // With listing based rollback, sometimes we only get the fileID of
interest(so that we can add rollback command block) w/o the actual file name.
+ // So, we want to ignore such invalid files from this list before we
add it to the rollback stats.
+ String partitionFullPath = metaClient.getBasePathV2().toString();
+ Map<String, Long> validLogBlocksToDelete = new HashMap<>();
+
rollbackRequest.getLogBlocksToBeDeleted().entrySet().stream().forEach((kv) -> {
+ String logFileFullPath = kv.getKey();
+ String logFileName = logFileFullPath.replace(partitionFullPath, "");
+ if (!StringUtils.isNullOrEmpty(logFileName)) {
+ validLogBlocksToDelete.put(kv.getKey(), kv.getValue());
+ }
+ });
+
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
+ .withLogFilesFromFailedCommit(validLogBlocksToDelete)
+ .build()))
.stream();
} else {
return Collections.singletonList(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
- .build()))
+ Pair.of(rollbackRequest.getPartitionPath(),
+ HoodieRollbackStat.newBuilder()
+ .withPartitionPath(rollbackRequest.getPartitionPath())
+ .build()))
.stream();
}
}, numPartitions);
}
+ private HoodieLogFileWriteCallback getRollbackLogMarkerCallback(final
WriteMarkers writeMarkers, String partitionPath, String fileId) {
+ return new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ // there may be existed marker file if fs support append. So always
return true;
+ createAppendMarker(logFileToAppend);
+ return true;
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return createAppendMarker(logFileToCreate);
+ }
+
+ private boolean createAppendMarker(HoodieLogFile logFileToAppend) {
+ return writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND,
+ config, fileId, metaClient.getActiveTimeline()).isPresent();
+ }
+ };
+ }
+
+ /**
+ * If there are log files created by previous rollback attempts, we want to
add them to rollback stats so that MDT is able to track them.
+ * @param context
+ * @param originalRollbackStats
+ * @param logPaths
+ * @return
+ */
+ private List<HoodieRollbackStat>
addLogFilesFromPreviousFailedRollbacksToStat(HoodieEngineContext context,
+
List<HoodieRollbackStat> originalRollbackStats,
+
Set<String> logPaths) {
+ if (logPaths.isEmpty()) {
+ // if rollback is not failed and re-attempted, we should not find any
additional log files here.
+ return originalRollbackStats;
+ }
+
+ final String basePathStr = config.getBasePath();
+ List<String> logFiles = new ArrayList<>(logPaths);
+ // populate partitionPath -> List<log file name>
+ HoodiePairData<String, List<String>> partitionPathToLogFilesHoodieData =
context.parallelize(logFiles)
+ // lets map each log file to partition path and log file name
+ .mapToPair((SerializablePairFunction<String, String, String>) t -> {
+ Path logFilePath = new Path(basePathStr, t);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePathStr), logFilePath.getParent());
+ return Pair.of(partitionPath, logFilePath.getName());
+ })
+ // lets group by partition path and collect it as log file list per
partition path
+ .groupByKey().mapToPair((SerializablePairFunction<Pair<String,
Iterable<String>>, String, List<String>>) t -> {
+ List<String> allFiles = new ArrayList<>();
+ t.getRight().forEach(entry -> allFiles.add(entry));
+ return Pair.of(t.getKey(), allFiles);
+ });
+
+ // populate partitionPath -> HoodieRollbackStat
+ HoodiePairData<String, HoodieRollbackStat>
partitionPathToRollbackStatsHoodieData =
+ context.parallelize(originalRollbackStats)
+ .mapToPair((SerializablePairFunction<HoodieRollbackStat, String,
HoodieRollbackStat>) t -> Pair.of(t.getPartitionPath(), t));
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(table.getHadoopConf());
Review Comment:
Configuration isn't gonna change right. How about making the
`SerializableConfiguration` a class-level object so that it doesn’t need to be
recreated for each method call?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -75,64 +87,150 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
List<String> markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
table, context, instantToRollback.getTimestamp(),
config.getRollbackParallelism());
int parallelism = Math.max(Math.min(markerPaths.size(),
config.getRollbackParallelism()), 1);
- return context.map(markerPaths, markerFilePath -> {
+ List<HoodieRollbackRequest> rollbackRequests = context.map(markerPaths,
markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String fileNameWithPartitionToRollback =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePathToRollback = new Path(basePath,
fileNameWithPartitionToRollback);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePathToRollback.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePathToRollback)) {
+ HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePathToRollback.toString());
+ fileId = baseFileToDelete.getFileId();
+ baseInstantTime = baseFileToDelete.getCommitTime();
+ } else if (FSUtils.isLogFile(fullFilePathToRollback)) {
+ // TODO: HUDI-1517 may distinguish log file created from log
file being appended in the future @guanziyue
+ // Now it should not have create type
+ checkArgument(type != IOType.CREATE, "Log file should not
support create io type now");
+ checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
+ HoodieLogFile logFileToDelete = new
HoodieLogFile(fullFilePathToRollback.toString());
+ fileId = logFileToDelete.getFileId();
+ baseInstantTime = logFileToDelete.getBaseCommitTime();
+ }
+ Objects.requireNonNull(fileId, "Cannot find valid fileId from
path: " + fullFilePathToRollback);
+ Objects.requireNonNull(baseInstantTime, "Cannot find valid base
instant from path: " + fullFilePathToRollback);
+ return new HoodieRollbackRequest(partitionPath, fileId,
baseInstantTime,
+ Collections.singletonList(fullFilePathToRollback.toString()),
Collections.emptyMap());
case APPEND:
- // NOTE: This marker file-path does NOT correspond to a log-file,
but rather is a phony
- // path serving as a "container" for the following
components:
- // - Base file's file-id
- // - Base file's commit instant
- // - Partition path
- return getRollbackRequestForAppend(instantToRollback,
WriteMarkers.stripMarkerSuffix(markerFilePath));
+ HoodieRollbackRequest rollbackRequestForAppend =
getRollbackRequestForAppend(instantToRollback, fileNameWithPartitionToRollback);
+ return rollbackRequestForAppend;
default:
throw new HoodieRollbackException("Unknown marker type, during
rollback of " + instantToRollback);
}
}, parallelism);
+
+ if (rollbackRequests.isEmpty()) {
+ return rollbackRequests;
+ }
+ // we need to ensure we return one rollback request per partition path,
fileId, baseInstant triplet.
+ HoodieData<HoodieRollbackRequest> processedRollbackRequests =
context.parallelize(rollbackRequests)
+ .mapToPair(rollbackRequest ->
Pair.of(Pair.of(rollbackRequest.getPartitionPath(),
rollbackRequest.getFileId()), rollbackRequest))
+ .reduceByKey((SerializableBiFunction<HoodieRollbackRequest,
HoodieRollbackRequest, HoodieRollbackRequest>)
RollbackUtils::mergeRollbackRequest, parallelism).values();
+
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(context.getHadoopConf());
+ // fetch valid sizes for log files to rollback. Optimize to make minimum
calls (one per partition or file slice since fs calls has to be made)
+ return fetchFileSizesForLogFilesToRollback(processedRollbackRequests,
serializableConfiguration).collectAsList();
} catch (Exception e) {
throw new HoodieRollbackException("Error rolling back using marker files
written for " + instantToRollback, e);
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
- Path baseFilePathForAppend = new Path(basePath, markerFilePath);
- String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
- String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
- String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), baseFilePathForAppend.getParent());
- Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
-
- // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
- // block to the latest log-file
- // TODO(HUDI-1517) use provided marker-file's path instead
- Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
- HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
- // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
- if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
- Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
- return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
- Collections.emptyMap());
- }
-
- Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
- if (latestLogFileOption.isPresent()) {
- HoodieLogFile latestLogFile = latestLogFileOption.get();
- // NOTE: Marker's don't carry information about the cumulative size of
the blocks that have been appended,
- // therefore we simply stub this value.
- logFilesWithBlocsToRollback =
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(),
-1L);
+ protected HoodieData<HoodieRollbackRequest>
fetchFileSizesForLogFilesToRollback(HoodieData<HoodieRollbackRequest>
rollbackRequestHoodieData,
+
SerializableConfiguration serializableConfiguration) {
+
+ // for log blocks to be deleted, lets fetch the actual size from FS
+ return rollbackRequestHoodieData
+ .map((SerializableFunction<HoodieRollbackRequest,
HoodieRollbackRequest>) rollbackRequest -> {
+ if (rollbackRequest.getLogBlocksToBeDeleted() == null ||
rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
+ return rollbackRequest;
+ }
+ // if there are log blocks to delete, fetch the appropriate size and
set them.
+ String relativePartitionPathStr = rollbackRequest.getPartitionPath();
+ Path relativePartitionPath = new Path(basePath + "/" +
relativePartitionPathStr);
+ String fileId = rollbackRequest.getFileId();
+ String baseCommitTime = rollbackRequest.getLatestBaseInstant();
+ List<String> filesToDelete = rollbackRequest.getFilesToBeDeleted();
+ Map<String, Long> logBlocksToBeDeleted =
rollbackRequest.getLogBlocksToBeDeleted();
+ Map<String, Long> logBlocksWithValidSizes = new HashMap<>();
+ List<String> logFilesNeedingFileSizes = new ArrayList<>();
+
+ // for those which has valid sizes already, we don't need to fetch
them again
+ logBlocksToBeDeleted.forEach((k, v) -> {
+ if (v != -1L) {
+ logBlocksWithValidSizes.put(k, v);
+ } else {
+ logFilesNeedingFileSizes.add(k);
+ }
+ });
+
+ FileSystem fs =
relativePartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatuses =
FSUtils.getFileStatusesUnderPartition(fs, relativePartitionPath,
logFilesNeedingFileSizes, true);
+
fileStatuses.stream().filter(Option::isPresent).map(Option::get).forEach(fileStatus
-> {
+ logBlocksWithValidSizes.put(fileStatus.getPath().getName(),
fileStatus.getLen());
+ });
+ return new HoodieRollbackRequest(relativePartitionPathStr, fileId,
baseCommitTime, filesToDelete,
+ logBlocksWithValidSizes);
+ });
+ }
+
+ protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String fileNameWithPartitionToRollback) throws IOException {
+ Path fullLogFilePath = new Path(basePath, fileNameWithPartitionToRollback);
+ String fileId;
+ String baseCommitTime;
+ String relativePartitionPath;
+ Option<HoodieLogFile> latestLogFileOption;
+
+ // Old marker files may be generated from base file name before HUDI-1517.
keep compatible with them.
+ // TODO: deprecated in HUDI-1517, may be removed in the future.
@guanziyue.gzy
+
Review Comment:
let's remove these unnecessary newlines.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableRollback.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+
+@Tag("functional")
+public class TestHoodieSparkCopyOnWriteTableRollback extends
TestHoodieSparkRollback {
Review Comment:
for all the tests here, the behabior shouldn't change right? as this feature
is only for MOR.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkRollback.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.functional.TestHoodieBackedMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSparkRollback extends SparkClientFunctionalTestHarness {
+
+ private String basePath;
+
+ private void initBasePath() {
+ basePath = basePath().substring(7);
+ }
+
+ private SparkRDDWriteClient getHoodieWriteClient(Boolean autoCommitEnabled)
throws IOException {
+ return
getHoodieWriteClient(getConfigToTestMDTRollbacks(autoCommitEnabled));
+ }
+
+ protected List<HoodieRecord> insertRecords(SparkRDDWriteClient client,
HoodieTestDataGenerator dataGen, String commitTime) {
+ /*
+ * Write 1 (only inserts, written as base file)
+ */
+ client.startCommitWithTime(commitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(commitTime, 20);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+
+ List<WriteStatus> statuses = client.upsert(writeRecords,
commitTime).collect();
+ assertNoWriteErrors(statuses);
+ return records;
+ }
+
+ protected List<WriteStatus> updateRecords(SparkRDDWriteClient client,
HoodieTestDataGenerator dataGen, String commitTime,
+ List<HoodieRecord> records) throws
IOException {
+ client.startCommitWithTime(commitTime);
+
+ records = dataGen.generateUpdates(commitTime, records);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+ List<WriteStatus> statuses = client.upsert(writeRecords,
commitTime).collect();
+ assertNoWriteErrors(statuses);
+ return statuses;
+ }
+
+ protected HoodieWriteConfig getConfigToTestMDTRollbacks(Boolean autoCommit) {
+ return getConfigToTestMDTRollbacks(autoCommit, true);
+ }
+
+ protected HoodieWriteConfig getConfigToTestMDTRollbacks(Boolean autoCommit,
Boolean mdtEnable) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(getPropertiesForKeyGen(true))
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withDeleteParallelism(2)
+ .withAutoCommit(autoCommit)
+ .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
Review Comment:
why is timeline server disabled?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/CommitMetadataUtils.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CommitMetadataUtils {
+
+ /* In spark mor table, task retries may generate log files which are not
included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT.
+ */
+ public static HoodieCommitMetadata
reconcileMetadataForMissingFiles(HoodieTable table, String commitActionType,
String instantTime,
+
HoodieCommitMetadata commitMetadata, HoodieWriteConfig config,
+
HoodieEngineContext context, Configuration hadoopConf, String
classNameForContext) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = commitMetadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) { // skip for COW table
+ // get all log files generated by makers
+ Set<String> allLogFilesMarkerPath = new
HashSet<>(markers.getAppendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+ Set<String> logFilesMarkerPath = new HashSet<>();
+ allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilePath ->
logFilesMarkerPath.add(logFilePath));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are log files generated by retried spark task, let's
generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(hadoopConf);
+ context.setJobStatus(classNameForContext, "Preparing data for missing
files to assit with generatin write stats");
+ // populate partition -> map (fileId -> HoodieWriteStat) // we just
need one write stat per fileID to fetch some info about
+ // the file slice of interest to populate WriteStat.
+ HoodiePairData<String, Map<String, HoodieWriteStat>>
partitionToWriteStatHoodieData = getPartitionToFileIdToFilesMap(commitMetadata,
context);
+
+ String localBasePath = config.getBasePath();
+ // populate partition -> map (fileId -> List <missing log file names>)
+ HoodiePairData<String, Map<String, List<String>>>
partitionToMissingLogFilesHoodieData =
+ getPartitionToFileIdToMissingLogFileMap(localBasePath,
logFilesMarkerPath, context, config.getFileListingParallelism());
+
+ // TODO: make one call per partition to fetch file sizes.
+ context.setJobStatus(classNameForContext, "Generating writeStat for
missing log files");
+
+ // lets join both to generate write stats for missing log files
+ List<Pair<String, List<HoodieWriteStat>>> additionalLogFileWriteStat =
partitionToWriteStatHoodieData
+ .join(partitionToMissingLogFilesHoodieData)
+ .map((SerializableFunction<Pair<String, Pair<Map<String,
HoodieWriteStat>, Map<String, List<String>>>>, Pair<String,
List<HoodieWriteStat>>>) v1 -> {
+ final Path basePathLocal = new Path(localBasePath);
+ String partitionPath = v1.getKey();
+ Map<String, HoodieWriteStat> fileIdToOriginalWriteStat =
v1.getValue().getKey();
+ Map<String, List<String>> missingFileIdToLogFileNames =
v1.getValue().getValue();
+ List<String> missingLogFileNames = new ArrayList<>();
+ missingFileIdToLogFileNames.values().forEach(logfiles ->
missingLogFileNames.addAll(logfiles));
+
+ // fetch file sizes from FileSystem
+ Path fullPartitionPath =
StringUtils.isNullOrEmpty(partitionPath) ? new Path(localBasePath) : new
Path(localBasePath, partitionPath);
+ FileSystem fileSystem =
fullPartitionPath.getFileSystem(serializableConfiguration.get());
+ List<Option<FileStatus>> fileStatuesOpt =
FSUtils.getFileStatusesUnderPartition(fileSystem, fullPartitionPath,
missingLogFileNames, true);
+ List<FileStatus> fileStatuses =
fileStatuesOpt.stream().filter(fileStatusOpt ->
fileStatusOpt.isPresent()).map(fileStatusOption ->
fileStatusOption.get()).collect(Collectors.toList());
+
+ // populate fileId -> List<FileStatus>
+ Map<String, List<FileStatus>> missingFileIdToLogFilesList = new
HashMap<>();
+ fileStatuses.forEach(fileStatus -> {
+ String fileId =
FSUtils.getFileIdFromLogPath(fileStatus.getPath());
+ if (!missingFileIdToLogFilesList.containsKey(fileId)) {
+ missingFileIdToLogFilesList.put(fileId, new ArrayList<>());
+ }
+ missingFileIdToLogFilesList.get(fileId).add(fileStatus);
+ });
+
+ List<HoodieWriteStat> missingWriteStats = new ArrayList();
+ missingFileIdToLogFilesList.forEach((k, logFileStatuses) -> {
+ String fileId = k;
+ HoodieDeltaWriteStat originalWriteStat =
+ (HoodieDeltaWriteStat)
fileIdToOriginalWriteStat.get(fileId); // are there chances that there won't be
any write stat in original list?
+ logFileStatuses.forEach(fileStatus -> {
+ // for every missing file, add a new HoodieDeltaWriteStat
+ HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
+ HoodieLogFile logFile = new HoodieLogFile(fileStatus);
+ writeStat.setPath(basePathLocal, logFile.getPath());
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setFileId(fileId);
+ writeStat.setTotalWriteBytes(logFile.getFileSize());
+ writeStat.setFileSizeInBytes(logFile.getFileSize());
+ writeStat.setLogVersion(logFile.getLogVersion());
+ List<String> logFiles = new
ArrayList<>(originalWriteStat.getLogFiles());
+ logFiles.add(logFile.getFileName());
+ writeStat.setLogFiles(logFiles);
+ writeStat.setBaseFile(originalWriteStat.getBaseFile());
+ writeStat.setPrevCommit(logFile.getBaseCommitTime());
+ missingWriteStats.add(writeStat);
+ });
+ });
+ return Pair.of(partitionPath, missingWriteStats);
+ }).collectAsList();
+
+ // add these write stat to commit meta. deltaWriteStat can be empty
due to file missing. See code above to address FileNotFoundException
+ for (Pair<String, List<HoodieWriteStat>> partitionDeltaStats :
additionalLogFileWriteStat) {
+ String partitionPath = partitionDeltaStats.getKey();
+ partitionDeltaStats.getValue().forEach(ws ->
commitMetadata.addWriteStat(partitionPath, ws));
+ }
+ }
+ }
+ return commitMetadata;
+ }
+
+ private static HoodiePairData<String, Map<String, HoodieWriteStat>>
getPartitionToFileIdToFilesMap(HoodieCommitMetadata commitMetadata,
HoodieEngineContext context) {
+
+ List<Map.Entry<String, List<HoodieWriteStat>>> partitionToWriteStats = new
ArrayList(commitMetadata.getPartitionToWriteStats().entrySet());
+
+ return context.parallelize(partitionToWriteStats)
+ .mapToPair((SerializablePairFunction<Map.Entry<String,
List<HoodieWriteStat>>, String, Map<String, HoodieWriteStat>>) t -> {
+ Map<String, HoodieWriteStat> fileIdToWriteStat = new HashMap<>();
+ t.getValue().forEach(writeStat -> {
+ if (!fileIdToWriteStat.containsKey(writeStat.getFileId())) {
+ fileIdToWriteStat.put(writeStat.getFileId(), writeStat);
+ }
+ });
+ return Pair.of(t.getKey(), fileIdToWriteStat);
+ });
+ }
+
+ private static HoodiePairData<String, Map<String, List<String>>>
getPartitionToFileIdToMissingLogFileMap(String basePathStr, Set<String>
logFilesMarkerPath, HoodieEngineContext context,
Review Comment:
Something like below:
```
private static HoodiePairData<String, Map<String, List<String>>>
getPartitionToFileIdToMissingLogFileMap(
String basePathStr,
Set<String> logFilesMarkerPath,
HoodieEngineContext context,
int parallelism) {
List<String> logFilePaths = new ArrayList<>(logFilesMarkerPath);
return context.parallelize(logFilePaths,
parallelism).mapToPair(logFilePath -> {
Path logFileFullPath = new Path(basePathStr, logFilePath);
String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePathStr), logFileFullPath.getParent());
return Pair.of(partitionPath, logFileFullPath.getName());
})
.groupByKey()
.mapToPair(partitionAndLogFiles -> {
String partitionPath = partitionAndLogFiles.getKey();
Path fullPartitionPath = StringUtils.isNullOrEmpty(partitionPath)
? new Path(basePathStr) : new Path(basePathStr, partitionPath);
Map<String, List<String>> fileIdToLogFiles = new HashMap<>();
for (String logFile : partitionAndLogFiles.getValue()) {
String fileId = FSUtils.getFileIdFromLogPath(new
Path(fullPartitionPath, logFile));
fileIdToLogFiles
.computeIfAbsent(fileId, k -> new ArrayList<>())
.add(logFile);
}
return Pair.of(partitionPath, fileIdToLogFiles);
});
}
```
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -191,6 +191,30 @@ public <W> HoodiePairData<K, Pair<V, Option<W>>>
leftOuterJoin(HoodiePairData<K,
return new HoodieListPairData<>(leftOuterJoined, lazy);
}
+ @Override
+ public <W> HoodiePairData<K, Pair<V, W>> join(HoodiePairData<K, W> other) {
+ ValidationUtils.checkArgument(other instanceof HoodieListPairData);
+
+ // Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
+ HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
Review Comment:
Also, please add a ut for this in `TestHoodieListDataPairData`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]