nada-attia commented on code in PR #18142:
URL: https://github.com/apache/hudi/pull/18142#discussion_r2817664791
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java:
##########
@@ -28,12 +36,83 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+
/**
* Utils class for performing various log file reading operations.
*/
public class LogReaderUtils {
+
+ /**
+ * Gets a map of log files which were created by commits with instant
timestamps that are less than or equal to the
+ * maxCommitInstantTime. All other log files will be filtered out. For each
log file maps to a list of commit instant
+ * times that are associated with each block that is found in the log file.
+ *
+ * @param metaClient Hoodie table meta client
+ * @param fsView Hoodie file system view
+ * @param partitionPaths list of partition paths to fetch log files from.
for MDT, this should be "files" or
+ * MetadataPartitionType.FILES.partitionPath()
+ * @param maxCommitInstantTime the max commit which created the log files
returned
+ * @param engineContext Engine context
+ * @return map of log file -> associated commit time for each block in the
log file
+ */
+ public static Map<HoodieLogFile, List<String>>
getAllLogFilesWithMaxCommit(HoodieTableMetaClient metaClient,
+
AbstractTableFileSystemView fsView,
+
List<String> partitionPaths,
+
String maxCommitInstantTime,
+
HoodieEngineContext engineContext) {
+ engineContext.setJobStatus("LogReaderUtils",
+ String.format("Getting list of log files in %s partition(s)",
partitionPaths.size()));
+
+ List<HoodieLogFile> logFiles = engineContext.flatMap(partitionPaths,
partitionPath -> fsView
+ .getLatestMergedFileSlicesBeforeOrOn(partitionPath,
maxCommitInstantTime)
+ .flatMap(FileSlice::getLogFiles),
+ Math.max(partitionPaths.size(), 1));
+
+ // get completion time of the max commit instant
+ String maxCommitCompletionTime =
metaClient.getActiveTimeline().filterCompletedInstants()
+ .findInstantsAfterOrEquals(maxCommitInstantTime, 1)
+ .filter(instant ->
instant.requestedTime().equals(maxCommitInstantTime))
+ .getInstants()
+ .stream()
+ .map(instant -> instant.getCompletionTime())
+ .findFirst().orElse(maxCommitInstantTime);
+
+ // filter out all commits completed after the max commit completion time
+ HoodieTimeline filteredTimeline = fsView.getTimeline().filter(instant ->
!fsView.getTimeline()
+ .findInstantsModifiedAfterByCompletionTime(maxCommitCompletionTime)
+ .containsInstant(instant));
+
+
+ engineContext.setJobStatus("LogReaderUtils",
+ String.format("Getting log file map for %s partition(s)",
partitionPaths.size()));
+ Map<HoodieLogFile, List<String>> logFilesWithMaxCommit =
engineContext.mapToPair(logFiles, logFile -> {
+ // read all blocks within the log file and find the commit associated
with each log block
+ List<String> blocksWithinLogFile = new ArrayList<>();
+ HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(metaClient.getStorage(), logFile, null);
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ String logBlockInstantTime =
block.getLogBlockHeader().get(INSTANT_TIME);
+ // check if the log file contains a block created by a commit that is
older than or equal to max commit
+ if (filteredTimeline.containsInstant(logBlockInstantTime)) {
Review Comment:
im under the impression, that we cannot archive a deltacommit which has not
been compacted and cleaned, can you confirm? if that is true, the above case
(the associated commit for a log block being archived) wouldn't be possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]