hudi-agent commented on code in PR #18987: URL: https://github.com/apache/hudi/pull/18987#discussion_r3459484681
########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java: ########## @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemas; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; + +/** + * Streaming sorted-merge iterator for RFC-103 LSM file groups. + * + * <p>The iterator merges one optional L1/base sorted run with zero or more L0/native parquet log + * runs. Every input file must already be sorted by record key; this class does not sort records + * within a file. The active head record from each participating run is tracked by a loser tree, + * which provides efficient k-way merge behavior while preserving deterministic source ordering for + * records with the same key. + * + * <p>Merge order follows the same conflict resolution model as {@code HoodieFileGroupReader}: the + * L1/base file is processed first, and L0 log files are processed in file-group log order so newer + * log instants or versions can win when ordering values tie. Native delete logs contain only delete + * metadata and are converted into {@link BufferedRecord} delete records before entering the merge. + * + * <p>To reduce open-reader memory pressure when a file group has many L0 runs, the iterator can spill + * selected L0 file iterators to sequential temporary files. Spilling changes how an input run is + * buffered, but not its merge order or merge semantics. The L1/base iterator is always kept direct, + * and native delete logs plus smaller L0 files are prioritized for direct reading. + */ +public class LsmFileGroupRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + private static final List<String> DELETE_LOG_ORDERING_FIELD_NAMES = Arrays.asList("ordering_val"); + + private final HoodieReaderContext<T> readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List<String> orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger<T> bufferedRecordMerger; + private final UpdateProcessor<T> updateProcessor; + private final LoserTree<T> readers; + private final int spillThreshold; + private final String spillBasePath; + private BufferedRecord<T> nextRecord; + + /** + * Creates an iterator that merges both the L1/base file, when present, and all L0 native log files. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + /** + * Creates an iterator over an LSM file group. + * + * @param includeBaseFile whether the L1/base file should be included in the merge. Passing + * {@code false} produces a log-only view for callers that only need L0 data. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = Math.max(0, props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue())); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); + this.readers = new LoserTree<>(initializeReaders()); + } + + /** + * Builds one sorted-run reader for each sorted run that has at least one record. + * + * <p>The assigned {@code mergeOrder} is the stable source precedence used when multiple runs expose + * the same key. It is assigned before spill selection so direct and spilled iterators remain + * semantically identical during the loser-tree merge. + */ + private List<SortedRunReader<T>> initializeReaders() throws IOException { + List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>(); + int mergeOrder = 0; + boolean hasBaseFileReader = includeBaseFile && inputSplit.getBaseFileOption().isPresent(); + if (hasBaseFileReader) { + addReader(sortedRunReaders, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + + List<LogReaderSpec> logReaderSpecs = new ArrayList<>(); + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile)); + } + Set<Integer> directLogMergeOrders = selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader); + for (LogReaderSpec spec : logReaderSpecs) { + ClosableIterator<BufferedRecord<T>> iterator = createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), spec.logFile.getFileSize()); Review Comment: 🤖 If `createFileIterator` (or `addReader`/`maybeSpillIterator`) throws partway through this loop, the base reader and the log readers already opened (and any already-materialized spill files) are never closed: the constructor throws before `this.readers` is assigned, so the caller never receives an object to `close()`. Could the init be wrapped so already-opened readers are closed on failure? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java: ########## @@ -466,13 +502,52 @@ public static boolean isLogFile(StoragePath logPath) { } public static boolean isLogFile(String fileName) { + if (matchNativeLogFile(fileName).isPresent()) { + return true; + } if (fileName.startsWith(LOG_FILE_START_WITH_CHARACTER)) { Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION); } return false; } + public static Option<Matcher> matchNativeLogFile(String fileName) { + if (StringUtils.isNullOrEmpty(fileName)) { + return Option.empty(); + } + String actualFileName = fileName.contains(StoragePath.SEPARATOR) + ? fileName.substring(fileName.lastIndexOf(StoragePath.SEPARATOR) + 1) + : fileName; + Matcher matcher = NATIVE_LOG_FILE_PATTERN.matcher(actualFileName); + return matcher.matches() ? Option.of(matcher) : Option.empty(); + } + + public static boolean isNativeDeleteLogFile(String fileName) { + return matchNativeLogFile(fileName).map(matcher -> "deletes".equals(matcher.group(8))).orElse(false); + } + + public static boolean isCDCLogFile(String fileName) { + if (StringUtils.isNullOrEmpty(fileName)) { + return false; + } + Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName); + if (nativeLogMatcher.isPresent()) { + return HoodieCDCUtils.CDC_LOGFILE_SUFFIX.substring(1).equals(nativeLogMatcher.get().group(8)); + } + Matcher matcher = LOG_FILE_PATTERN.matcher(getFileNameFromPath(fileName)); + return matcher.matches() && HoodieCDCUtils.CDC_LOGFILE_SUFFIX.equals(matcher.group(10)); Review Comment: 🤖 nit: the `.substring(1)` is non-obvious here — it strips the leading `.` from `CDC_LOGFILE_SUFFIX` (which is `".cdc"`) so it matches the extension captured by `NATIVE_LOG_FILE_PATTERN` without a dot. Could you either extract a constant (e.g. `CDC_EXTENSION = HoodieCDCUtils.CDC_LOGFILE_SUFFIX.substring(1)`) or add a brief inline comment like `// CDC_LOGFILE_SUFFIX starts with '.'; strip it to compare against the bare extension` so the intent is clear? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.schema; + +/** + * Factory class for {@link HoodieSchema}. Review Comment: 🤖 nit: the Javadoc says "Factory class" but the class only holds a static constant — no factory methods are present. Could you update this to something like "Predefined {@link HoodieSchema} constants" so it accurately describes what the class actually does? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemas; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.OrderingValues; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; + +/** + * Streaming sorted-merge iterator for RFC-103 LSM file groups. + * + * <p>The iterator merges one optional L1/base sorted run with zero or more L0/native parquet log + * runs. Every input file must already be sorted by record key; this class does not sort records + * within a file. The active head record from each participating run is tracked by a loser tree, + * which provides efficient k-way merge behavior while preserving deterministic source ordering for + * records with the same key. + * + * <p>Merge order follows the same conflict resolution model as {@code HoodieFileGroupReader}: the + * L1/base file is processed first, and L0 log files are processed in file-group log order so newer + * log instants or versions can win when ordering values tie. Native delete logs contain only delete + * metadata and are converted into {@link BufferedRecord} delete records before entering the merge. + * + * <p>To reduce open-reader memory pressure when a file group has many L0 runs, the iterator can spill + * selected L0 file iterators to sequential temporary files. Spilling changes how an input run is + * buffered, but not its merge order or merge semantics. The L1/base iterator is always kept direct, + * and native delete logs plus smaller L0 files are prioritized for direct reading. + */ +public class LsmFileGroupRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + + private final HoodieReaderContext<T> readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List<String> orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger<T> bufferedRecordMerger; + private final UpdateProcessor<T> updateProcessor; + private final LoserTree<T> readers; + private final int spillThreshold; + private final String spillBasePath; + private BufferedRecord<T> nextRecord; + + /** + * Creates an iterator that merges both the L1/base file, when present, and all L0 native log files. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + /** + * Creates an iterator over an LSM file group. + * + * @param includeBaseFile whether the L1/base file should be included in the merge. Passing + * {@code false} produces a log-only view for callers that only need L0 data. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = Math.max(0, props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue())); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); + this.readers = new LoserTree<>(initializeReaders()); + } + + /** + * Builds one reader state for each sorted run that has at least one record. + * + * <p>The assigned {@code mergeOrder} is the stable source precedence used when multiple runs expose + * the same key. It is assigned before spill selection so direct and spilled iterators remain + * semantically identical during the loser-tree merge. + */ + private List<ReaderState<T>> initializeReaders() throws IOException { + List<ReaderState<T>> readerStates = new ArrayList<>(); + int mergeOrder = 0; + boolean hasBaseFileReader = includeBaseFile && inputSplit.getBaseFileOption().isPresent(); + if (hasBaseFileReader) { + addReader(readerStates, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + + List<LogReaderSpec> logReaderSpecs = new ArrayList<>(); + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile)); + } + Set<Integer> directLogMergeOrders = selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader); + for (LogReaderSpec spec : logReaderSpecs) { + ClosableIterator<BufferedRecord<T>> iterator = createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), spec.logFile.getFileSize()); + addReader(readerStates, spec.mergeOrder, maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator)); + } + return readerStates; + } + + /** + * Selects which L0 log readers stay direct under the configured spill threshold. + * + * <p>The base/L1 reader consumes one direct-reader slot when present and is never spilled. Remaining + * direct-reader budget is spent on native delete logs first, then smaller log files, because those + * readers tend to be cheaper to keep open while avoiding unnecessary spill materialization. + */ + private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, boolean hasBaseFileReader) { + int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0); + if (directLogBudget <= 0) { + return new HashSet<>(); + } + Set<Integer> directMergeOrders = new HashSet<>(); + logReaderSpecs.stream() + .sorted(Comparator + .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog) + .thenComparingLong(spec -> spec.fileSize) + .thenComparingInt(spec -> spec.mergeOrder)) + .limit(directLogBudget) + .forEach(spec -> directMergeOrders.add(spec.mergeOrder)); + return directMergeOrders; + } + + /** + * Returns the original iterator when it is selected for direct reading, otherwise materializes it + * into a sequential spill iterator. + */ + private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean directReader, + ClosableIterator<BufferedRecord<T>> iterator) { + if (directReader) { + return iterator; + } + return new SpillableLsmRecordIterator<>(iterator, readerContext.getRecordSerializer(), readerContext.getRecordContext(), spillBasePath); + } + + /** + * Metadata used only for choosing the direct-versus-spilled L0 reader plan. + */ + private static class LogReaderSpec { + private final int mergeOrder; + private final HoodieLogFile logFile; + private final boolean nativeDeleteLog; + private final long fileSize; + + private LogReaderSpec(int mergeOrder, HoodieLogFile logFile) { + this.mergeOrder = mergeOrder; + this.logFile = logFile; + this.nativeDeleteLog = FSUtils.isNativeDeleteLogFile(logFile.getFileName()); + this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : Long.MAX_VALUE; + } + } + + /** + * Adds a reader to the merge only when the underlying sorted run contains at least one record. + */ + private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) { + ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator); + if (readerState.advance()) { + readerStates.add(readerState); + } else { + readerState.close(); + } + } + + /** + * Creates the L1/base sorted-run iterator, using the bootstrap base file when the base file is + * bootstrapped. + */ + private ClosableIterator<BufferedRecord<T>> createBaseFileIterator(HoodieBaseFile baseFile) throws IOException { + BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile); Review Comment: 🤖 Worth digging into — `makeBootstrapBaseFileIterator` reads the skeleton file too, and that's where the hoodie meta columns (including `_hoodie_record_key`) come from; the bootstrap data file alone doesn't carry them. Since the LSM merge is keyed on the record key, if this path skips the skeleton/`mergeBootstrapReaders` step the key column would be absent and the merge could misbehave. Could you confirm how the record key is obtained for bootstrap base files here? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java: ########## @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemas; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; + +/** + * Streaming sorted-merge iterator for RFC-103 LSM file groups. + * + * <p>The iterator merges one optional L1/base sorted run with zero or more L0/native parquet log + * runs. Every input file must already be sorted by record key; this class does not sort records + * within a file. The active head record from each participating run is tracked by a loser tree, + * which provides efficient k-way merge behavior while preserving deterministic source ordering for + * records with the same key. + * + * <p>Merge order follows the same conflict resolution model as {@code HoodieFileGroupReader}: the + * L1/base file is processed first, and L0 log files are processed in file-group log order so newer + * log instants or versions can win when ordering values tie. Native delete logs contain only delete + * metadata and are converted into {@link BufferedRecord} delete records before entering the merge. + * + * <p>To reduce open-reader memory pressure when a file group has many L0 runs, the iterator can spill + * selected L0 file iterators to sequential temporary files. Spilling changes how an input run is + * buffered, but not its merge order or merge semantics. The L1/base iterator is always kept direct, + * and native delete logs plus smaller L0 files are prioritized for direct reading. + */ +public class LsmFileGroupRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + private static final List<String> DELETE_LOG_ORDERING_FIELD_NAMES = Arrays.asList("ordering_val"); + + private final HoodieReaderContext<T> readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List<String> orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger<T> bufferedRecordMerger; + private final UpdateProcessor<T> updateProcessor; + private final LoserTree<T> readers; + private final int spillThreshold; + private final String spillBasePath; + private BufferedRecord<T> nextRecord; + + /** + * Creates an iterator that merges both the L1/base file, when present, and all L0 native log files. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + /** + * Creates an iterator over an LSM file group. + * + * @param includeBaseFile whether the L1/base file should be included in the merge. Passing + * {@code false} produces a log-only view for callers that only need L0 data. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = Math.max(0, props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue())); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); + this.readers = new LoserTree<>(initializeReaders()); + } + + /** + * Builds one sorted-run reader for each sorted run that has at least one record. + * + * <p>The assigned {@code mergeOrder} is the stable source precedence used when multiple runs expose + * the same key. It is assigned before spill selection so direct and spilled iterators remain + * semantically identical during the loser-tree merge. + */ + private List<SortedRunReader<T>> initializeReaders() throws IOException { + List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>(); + int mergeOrder = 0; + boolean hasBaseFileReader = includeBaseFile && inputSplit.getBaseFileOption().isPresent(); + if (hasBaseFileReader) { + addReader(sortedRunReaders, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + + List<LogReaderSpec> logReaderSpecs = new ArrayList<>(); + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile)); + } + Set<Integer> directLogMergeOrders = selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader); + for (LogReaderSpec spec : logReaderSpecs) { + ClosableIterator<BufferedRecord<T>> iterator = createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), spec.logFile.getFileSize()); + addReader(sortedRunReaders, spec.mergeOrder, maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator)); + } + return sortedRunReaders; + } + + /** + * Selects which L0 log readers stay direct under the configured spill threshold. + * + * <p>The base/L1 reader consumes one direct-reader slot when present and is never spilled. Remaining + * direct-reader budget is spent on native delete logs first, then smaller log files, because those + * readers tend to be cheaper to keep open while avoiding unnecessary spill materialization. + */ + private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, boolean hasBaseFileReader) { + return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader, spillThreshold); + } + + @VisibleForTesting + static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, + boolean hasBaseFileReader, + int spillThreshold) { + int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0); + if (directLogBudget <= 0) { + return new HashSet<>(); + } + Set<Integer> directMergeOrders = new HashSet<>(); + logReaderSpecs.stream() + .sorted(Comparator + .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog) + .thenComparingLong(spec -> spec.fileSize) + .thenComparingInt(spec -> spec.mergeOrder)) + .limit(directLogBudget) + .forEach(spec -> directMergeOrders.add(spec.mergeOrder)); + return directMergeOrders; + } + + /** + * Returns the original iterator when it is selected for direct reading, otherwise materializes it + * into a sequential spill iterator. + */ + private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean directReader, + ClosableIterator<BufferedRecord<T>> iterator) { + if (directReader) { + return iterator; + } + return new SpillableLsmRecordIterator<>(iterator, readerContext.getRecordSerializer(), readerContext.getRecordContext(), spillBasePath); + } + + /** + * Metadata used only for choosing the direct-versus-spilled L0 reader plan. + */ + @VisibleForTesting + static class LogReaderSpec { + final int mergeOrder; + final HoodieLogFile logFile; + final boolean nativeDeleteLog; + final long fileSize; + + LogReaderSpec(int mergeOrder, HoodieLogFile logFile) { + this.mergeOrder = mergeOrder; + this.logFile = logFile; + this.nativeDeleteLog = FSUtils.isNativeDeleteLogFile(logFile.getFileName()); + this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : Long.MAX_VALUE; + } + } + + /** + * Adds a reader to the merge only when the underlying sorted run contains at least one record. + */ + private void addReader(List<SortedRunReader<T>> sortedRunReaders, int mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) { + SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder, iterator); + if (sortedRunReader.advance()) { + sortedRunReaders.add(sortedRunReader); + } else { + sortedRunReader.close(); + } + } + + /** + * Creates the L1/base sorted-run iterator. + */ + private ClosableIterator<BufferedRecord<T>> createBaseFileIterator(HoodieBaseFile baseFile) throws IOException { + if (baseFile.getBootstrapBaseFile().isPresent()) { + // Bootstrap base files require joining the skeleton file with the external data file. + // Keep that path on HoodieFileGroupReader until the LSM reader implements the same merge. + throw new UnsupportedOperationException("LSM file group reader does not support bootstrap base files"); + } + return createFileIterator(baseFile.getPathInfo(), baseFile.getStoragePath(), baseFile.getFileSize()); + } + + /** + * Creates a sorted-run iterator for a parquet data file or a native parquet log file. + * + * <p>Native delete logs use a specialized schema and are routed through + * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath, long)}. + */ + private ClosableIterator<BufferedRecord<T>> createFileIterator(StoragePathInfo pathInfo, + StoragePath path, + long fileSize) throws IOException { + StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path; + if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) { + return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize); + } + Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns = + readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath); + HoodieSchema fileRequiredSchema = requiredSchemaAndRenamedColumns.getLeft(); + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } + if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || !requiredSchemaAndRenamedColumns.getRight().isEmpty()) { + UnaryOperator<T> projector = readerContext.getRecordContext() + .projectRecord(fileRequiredSchema, readerSchema, requiredSchemaAndRenamedColumns.getRight()); + recordIterator = new CloseableMappingIterator<>(recordIterator, projector); + } + if (readerContext.getInstantRange().isPresent()) { + recordIterator = readerContext.applyInstantRangeFilter(recordIterator); + } + return new CloseableMappingIterator<>(recordIterator, record -> BufferedRecords.fromEngineRecord( + readerContext.getRecordContext().seal(record), + readerSchema, + readerContext.getRecordContext(), + orderingFieldNames, + readerContext.getRecordContext().isDeleteRecord(record, readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema)))); + } + + /** + * Creates delete records from an RFC-103 native delete log. + * + * <p>The delete log schema intentionally contains only the record key and ordering value fields; + * partition path and full data columns are not read for delete-only logs. + */ + private ClosableIterator<BufferedRecord<T>> createNativeDeleteLogIterator(StoragePathInfo pathInfo, + StoragePath storagePath, + long fileSize) throws IOException { + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } + return new CloseableMappingIterator<>(recordIterator, record -> { + return createNativeDeleteRecord(readerContext, record); + }); + } + + @VisibleForTesting + static <T> BufferedRecord<T> createNativeDeleteRecord(HoodieReaderContext<T> readerContext, T record) { + Object recordKey = readerContext.getRecordContext().getValue(record, HoodieSchemas.DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD); + // Preserve the delete log's ordering value so event-time/custom merge modes can compare + // deletes against data records instead of treating every native delete as commit-time ordered. + Comparable orderingValue = + readerContext.getRecordContext().getOrderingValue(record, HoodieSchemas.DELETE_LOG_SCHEMA, DELETE_LOG_ORDERING_FIELD_NAMES); + return BufferedRecords.createDelete(recordKey.toString(), orderingValue); + } + + @Override + public boolean hasNext() { + if (nextRecord != null) { + return true; + } + while (!readers.isEmpty()) { + BufferedRecord<T> mergedRecord = nextMergedRecord(); + nextRecord = updateProcessor.processUpdate( + mergedRecord.getRecordKey(), null, mergedRecord, mergedRecord.isDelete()); Review Comment: 🤖 `processUpdate` is always called with `null` as the previous record, but `HoodieFileGroupReader.hasNextBaseRecord` passes the base record there (and skips `processUpdate` entirely for base-only records). As written, base-only and base+log records are all counted as inserts (numUpdates stays 0) and `BaseFileUpdateCallback.onUpdate`/`onDelete` receive a null before-image — which breaks merge-handle stats and secondary-index/RLI maintenance. @nsivabalan should the base run's record be threaded through as the previous version here? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java: ########## @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemas; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; + +/** + * Streaming sorted-merge iterator for RFC-103 LSM file groups. + * + * <p>The iterator merges one optional L1/base sorted run with zero or more L0/native parquet log + * runs. Every input file must already be sorted by record key; this class does not sort records + * within a file. The active head record from each participating run is tracked by a loser tree, + * which provides efficient k-way merge behavior while preserving deterministic source ordering for + * records with the same key. + * + * <p>Merge order follows the same conflict resolution model as {@code HoodieFileGroupReader}: the + * L1/base file is processed first, and L0 log files are processed in file-group log order so newer + * log instants or versions can win when ordering values tie. Native delete logs contain only delete + * metadata and are converted into {@link BufferedRecord} delete records before entering the merge. + * + * <p>To reduce open-reader memory pressure when a file group has many L0 runs, the iterator can spill + * selected L0 file iterators to sequential temporary files. Spilling changes how an input run is + * buffered, but not its merge order or merge semantics. The L1/base iterator is always kept direct, + * and native delete logs plus smaller L0 files are prioritized for direct reading. + */ +public class LsmFileGroupRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + private static final List<String> DELETE_LOG_ORDERING_FIELD_NAMES = Arrays.asList("ordering_val"); + + private final HoodieReaderContext<T> readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List<String> orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger<T> bufferedRecordMerger; + private final UpdateProcessor<T> updateProcessor; + private final LoserTree<T> readers; + private final int spillThreshold; + private final String spillBasePath; + private BufferedRecord<T> nextRecord; + + /** + * Creates an iterator that merges both the L1/base file, when present, and all L0 native log files. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + /** + * Creates an iterator over an LSM file group. + * + * @param includeBaseFile whether the L1/base file should be included in the merge. Passing + * {@code false} produces a log-only view for callers that only need L0 data. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = Math.max(0, props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue())); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); + this.readers = new LoserTree<>(initializeReaders()); + } + + /** + * Builds one sorted-run reader for each sorted run that has at least one record. + * + * <p>The assigned {@code mergeOrder} is the stable source precedence used when multiple runs expose + * the same key. It is assigned before spill selection so direct and spilled iterators remain + * semantically identical during the loser-tree merge. + */ + private List<SortedRunReader<T>> initializeReaders() throws IOException { + List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>(); + int mergeOrder = 0; + boolean hasBaseFileReader = includeBaseFile && inputSplit.getBaseFileOption().isPresent(); + if (hasBaseFileReader) { + addReader(sortedRunReaders, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + + List<LogReaderSpec> logReaderSpecs = new ArrayList<>(); + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile)); + } + Set<Integer> directLogMergeOrders = selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader); + for (LogReaderSpec spec : logReaderSpecs) { + ClosableIterator<BufferedRecord<T>> iterator = createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), spec.logFile.getFileSize()); + addReader(sortedRunReaders, spec.mergeOrder, maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator)); + } + return sortedRunReaders; + } + + /** + * Selects which L0 log readers stay direct under the configured spill threshold. + * + * <p>The base/L1 reader consumes one direct-reader slot when present and is never spilled. Remaining + * direct-reader budget is spent on native delete logs first, then smaller log files, because those + * readers tend to be cheaper to keep open while avoiding unnecessary spill materialization. + */ + private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, boolean hasBaseFileReader) { + return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader, spillThreshold); + } + + @VisibleForTesting + static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, + boolean hasBaseFileReader, + int spillThreshold) { + int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0); + if (directLogBudget <= 0) { + return new HashSet<>(); + } + Set<Integer> directMergeOrders = new HashSet<>(); + logReaderSpecs.stream() + .sorted(Comparator + .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog) + .thenComparingLong(spec -> spec.fileSize) + .thenComparingInt(spec -> spec.mergeOrder)) + .limit(directLogBudget) + .forEach(spec -> directMergeOrders.add(spec.mergeOrder)); + return directMergeOrders; + } + + /** + * Returns the original iterator when it is selected for direct reading, otherwise materializes it + * into a sequential spill iterator. + */ + private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean directReader, + ClosableIterator<BufferedRecord<T>> iterator) { + if (directReader) { + return iterator; + } + return new SpillableLsmRecordIterator<>(iterator, readerContext.getRecordSerializer(), readerContext.getRecordContext(), spillBasePath); + } + + /** + * Metadata used only for choosing the direct-versus-spilled L0 reader plan. + */ + @VisibleForTesting + static class LogReaderSpec { + final int mergeOrder; + final HoodieLogFile logFile; + final boolean nativeDeleteLog; + final long fileSize; + + LogReaderSpec(int mergeOrder, HoodieLogFile logFile) { + this.mergeOrder = mergeOrder; + this.logFile = logFile; + this.nativeDeleteLog = FSUtils.isNativeDeleteLogFile(logFile.getFileName()); + this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : Long.MAX_VALUE; + } + } + + /** + * Adds a reader to the merge only when the underlying sorted run contains at least one record. + */ + private void addReader(List<SortedRunReader<T>> sortedRunReaders, int mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) { + SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder, iterator); + if (sortedRunReader.advance()) { + sortedRunReaders.add(sortedRunReader); + } else { + sortedRunReader.close(); + } + } + + /** + * Creates the L1/base sorted-run iterator. + */ + private ClosableIterator<BufferedRecord<T>> createBaseFileIterator(HoodieBaseFile baseFile) throws IOException { + if (baseFile.getBootstrapBaseFile().isPresent()) { + // Bootstrap base files require joining the skeleton file with the external data file. + // Keep that path on HoodieFileGroupReader until the LSM reader implements the same merge. + throw new UnsupportedOperationException("LSM file group reader does not support bootstrap base files"); + } + return createFileIterator(baseFile.getPathInfo(), baseFile.getStoragePath(), baseFile.getFileSize()); + } + + /** + * Creates a sorted-run iterator for a parquet data file or a native parquet log file. + * + * <p>Native delete logs use a specialized schema and are routed through + * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath, long)}. + */ + private ClosableIterator<BufferedRecord<T>> createFileIterator(StoragePathInfo pathInfo, + StoragePath path, + long fileSize) throws IOException { + StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path; + if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) { + return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize); + } + Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns = + readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath); + HoodieSchema fileRequiredSchema = requiredSchemaAndRenamedColumns.getLeft(); + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } + if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || !requiredSchemaAndRenamedColumns.getRight().isEmpty()) { + UnaryOperator<T> projector = readerContext.getRecordContext() + .projectRecord(fileRequiredSchema, readerSchema, requiredSchemaAndRenamedColumns.getRight()); + recordIterator = new CloseableMappingIterator<>(recordIterator, projector); + } + if (readerContext.getInstantRange().isPresent()) { + recordIterator = readerContext.applyInstantRangeFilter(recordIterator); + } + return new CloseableMappingIterator<>(recordIterator, record -> BufferedRecords.fromEngineRecord( + readerContext.getRecordContext().seal(record), + readerSchema, + readerContext.getRecordContext(), + orderingFieldNames, + readerContext.getRecordContext().isDeleteRecord(record, readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema)))); + } + + /** + * Creates delete records from an RFC-103 native delete log. + * + * <p>The delete log schema intentionally contains only the record key and ordering value fields; + * partition path and full data columns are not read for delete-only logs. + */ + private ClosableIterator<BufferedRecord<T>> createNativeDeleteLogIterator(StoragePathInfo pathInfo, + StoragePath storagePath, + long fileSize) throws IOException { + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } + return new CloseableMappingIterator<>(recordIterator, record -> { + return createNativeDeleteRecord(readerContext, record); + }); + } + + @VisibleForTesting + static <T> BufferedRecord<T> createNativeDeleteRecord(HoodieReaderContext<T> readerContext, T record) { + Object recordKey = readerContext.getRecordContext().getValue(record, HoodieSchemas.DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD); + // Preserve the delete log's ordering value so event-time/custom merge modes can compare + // deletes against data records instead of treating every native delete as commit-time ordered. + Comparable orderingValue = + readerContext.getRecordContext().getOrderingValue(record, HoodieSchemas.DELETE_LOG_SCHEMA, DELETE_LOG_ORDERING_FIELD_NAMES); + return BufferedRecords.createDelete(recordKey.toString(), orderingValue); + } + + @Override + public boolean hasNext() { + if (nextRecord != null) { + return true; + } + while (!readers.isEmpty()) { + BufferedRecord<T> mergedRecord = nextMergedRecord(); + nextRecord = updateProcessor.processUpdate( + mergedRecord.getRecordKey(), null, mergedRecord, mergedRecord.isDelete()); + if (nextRecord != null) { + return true; + } + } + return false; + } + + /** + * Pops and merges all currently visible versions for the next record key. + */ + private BufferedRecord<T> nextMergedRecord() { + BufferedRecord<T> firstRecord = readers.peekWinner(); + String recordKey = firstRecord.getRecordKey(); + BufferedRecord<T> mergedRecord = null; + while (!readers.isEmpty() && recordKey.equals(readers.peekWinner().getRecordKey())) { + mergedRecord = merge(mergedRecord, readers.popWinner()); + } + return mergedRecord; + } + + /** + * Applies the configured buffered-record merger so later sources can win when ordering values tie. + */ + private BufferedRecord<T> merge(BufferedRecord<T> existingRecord, BufferedRecord<T> newRecord) { + if (existingRecord == null) { + return newRecord; + } + try { + return bufferedRecordMerger.deltaMerge(newRecord, existingRecord).orElse(existingRecord); Review Comment: 🤖 Because the base file is treated as just another sorted run, this merge only ever calls `deltaMerge`, never `finalMerge`. For commit-time/event-time/custom and the partial-update mergers the two are outcome-equivalent (I checked), but `ExpressionPayloadRecordMerger` deliberately uses a different (pre-combine) merger for `deltaMerge` than for `finalMerge`. Could base↔log merges produce wrong results under ExpressionPayload (e.g. MERGE INTO) on an LSM table, and is that mode in scope for this reader? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.HoodieRecordReader; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.IteratorMode; +import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import lombok.Builder; +import lombok.Getter; + +import java.io.IOException; +import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +/** + * Record reader for RFC-103 LSM file groups backed by native parquet log files. + * + * <p>This reader is intentionally separate from {@code HoodieFileGroupReader}. Callers should use it + * only when the file group follows pure LSM sorted-file semantics: the optional base file is treated + * as the L1 sorted run and native parquet log files are treated as L0 sorted runs. Mixed legacy log + * file groups should continue to use {@code HoodieFileGroupReader}. + * + * <p>The reader owns file-group level setup that mirrors {@code HoodieFileGroupReader}: schema + * handling, merge properties, iterator mode, output projection, read stats, and update callbacks. + * The actual sorted k-way merge is delegated to {@link LsmFileGroupRecordIterator}. + */ +public final class HoodieLsmFileGroupReader<T> implements HoodieRecordReader<T> { + + private final HoodieReaderContext<T> readerContext; + private final HoodieTableMetaClient metaClient; + private final InputSplit inputSplit; + private final List<String> orderingFieldNames; + private final HoodieStorage storage; + private final TypedProperties props; + private final ReaderParameters readerParameters; + private final Option<UnaryOperator<T>> outputConverter; + private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback; + private ClosableIterator<BufferedRecord<T>> lsmRecordIterator; + @Getter + private final HoodieReadStats readStats; + + @Builder(setterPrefix = "with") + private HoodieLsmFileGroupReader( + HoodieReaderContext<T> readerContext, + String latestCommitTime, + HoodieSchema dataSchema, + HoodieSchema requestedSchema, + Option<InternalSchema> internalSchemaOpt, + HoodieTableMetaClient hoodieTableMetaClient, + TypedProperties props, + Option<HoodieBaseFile> baseFileOption, + Stream<HoodieLogFile> logFiles, + String partitionPath, + Long start, + Long length, + Boolean allowInflightInstants, + Boolean emitDelete, Review Comment: 🤖 nit: the parameter is named `emitDelete` (singular) but the downstream builder setter it feeds is `ReaderParameters.builder().emitDeletes()` (plural). Could you rename it to `emitDeletes` to keep the two consistent and reduce the small surprise when reading the builder call a few lines below? <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemas; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.OrderingValues; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; + +/** + * Streaming sorted-merge iterator for RFC-103 LSM file groups. + * + * <p>The iterator merges one optional L1/base sorted run with zero or more L0/native parquet log + * runs. Every input file must already be sorted by record key; this class does not sort records + * within a file. The active head record from each participating run is tracked by a loser tree, + * which provides efficient k-way merge behavior while preserving deterministic source ordering for + * records with the same key. + * + * <p>Merge order follows the same conflict resolution model as {@code HoodieFileGroupReader}: the + * L1/base file is processed first, and L0 log files are processed in file-group log order so newer + * log instants or versions can win when ordering values tie. Native delete logs contain only delete + * metadata and are converted into {@link BufferedRecord} delete records before entering the merge. + * + * <p>To reduce open-reader memory pressure when a file group has many L0 runs, the iterator can spill + * selected L0 file iterators to sequential temporary files. Spilling changes how an input run is + * buffered, but not its merge order or merge semantics. The L1/base iterator is always kept direct, + * and native delete logs plus smaller L0 files are prioritized for direct reading. + */ +public class LsmFileGroupRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + + private final HoodieReaderContext<T> readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List<String> orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger<T> bufferedRecordMerger; + private final UpdateProcessor<T> updateProcessor; + private final LoserTree<T> readers; + private final int spillThreshold; + private final String spillBasePath; + private BufferedRecord<T> nextRecord; + + /** + * Creates an iterator that merges both the L1/base file, when present, and all L0 native log files. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + /** + * Creates an iterator over an LSM file group. + * + * @param includeBaseFile whether the L1/base file should be included in the merge. Passing + * {@code false} produces a log-only view for callers that only need L0 data. + */ + public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List<String> orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = Math.max(0, props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue())); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); + this.readers = new LoserTree<>(initializeReaders()); + } + + /** + * Builds one reader state for each sorted run that has at least one record. + * + * <p>The assigned {@code mergeOrder} is the stable source precedence used when multiple runs expose + * the same key. It is assigned before spill selection so direct and spilled iterators remain + * semantically identical during the loser-tree merge. + */ + private List<ReaderState<T>> initializeReaders() throws IOException { + List<ReaderState<T>> readerStates = new ArrayList<>(); + int mergeOrder = 0; + boolean hasBaseFileReader = includeBaseFile && inputSplit.getBaseFileOption().isPresent(); + if (hasBaseFileReader) { + addReader(readerStates, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + + List<LogReaderSpec> logReaderSpecs = new ArrayList<>(); + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile)); + } + Set<Integer> directLogMergeOrders = selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader); + for (LogReaderSpec spec : logReaderSpecs) { + ClosableIterator<BufferedRecord<T>> iterator = createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), spec.logFile.getFileSize()); + addReader(readerStates, spec.mergeOrder, maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator)); + } + return readerStates; + } + + /** + * Selects which L0 log readers stay direct under the configured spill threshold. + * + * <p>The base/L1 reader consumes one direct-reader slot when present and is never spilled. Remaining + * direct-reader budget is spent on native delete logs first, then smaller log files, because those + * readers tend to be cheaper to keep open while avoiding unnecessary spill materialization. + */ + private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> logReaderSpecs, boolean hasBaseFileReader) { + int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0); + if (directLogBudget <= 0) { + return new HashSet<>(); + } + Set<Integer> directMergeOrders = new HashSet<>(); + logReaderSpecs.stream() + .sorted(Comparator + .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog) + .thenComparingLong(spec -> spec.fileSize) + .thenComparingInt(spec -> spec.mergeOrder)) + .limit(directLogBudget) + .forEach(spec -> directMergeOrders.add(spec.mergeOrder)); + return directMergeOrders; + } + + /** + * Returns the original iterator when it is selected for direct reading, otherwise materializes it + * into a sequential spill iterator. + */ + private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean directReader, + ClosableIterator<BufferedRecord<T>> iterator) { + if (directReader) { + return iterator; + } + return new SpillableLsmRecordIterator<>(iterator, readerContext.getRecordSerializer(), readerContext.getRecordContext(), spillBasePath); + } + + /** + * Metadata used only for choosing the direct-versus-spilled L0 reader plan. + */ + private static class LogReaderSpec { + private final int mergeOrder; + private final HoodieLogFile logFile; + private final boolean nativeDeleteLog; + private final long fileSize; + + private LogReaderSpec(int mergeOrder, HoodieLogFile logFile) { + this.mergeOrder = mergeOrder; + this.logFile = logFile; + this.nativeDeleteLog = FSUtils.isNativeDeleteLogFile(logFile.getFileName()); + this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : Long.MAX_VALUE; + } + } + + /** + * Adds a reader to the merge only when the underlying sorted run contains at least one record. + */ + private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) { + ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator); + if (readerState.advance()) { + readerStates.add(readerState); + } else { + readerState.close(); + } + } + + /** + * Creates the L1/base sorted-run iterator, using the bootstrap base file when the base file is + * bootstrapped. + */ + private ClosableIterator<BufferedRecord<T>> createBaseFileIterator(HoodieBaseFile baseFile) throws IOException { + BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile); + return createFileIterator(file.getPathInfo(), file.getStoragePath(), file.getFileSize()); + } + + /** + * Creates a sorted-run iterator for a parquet data file or a native parquet log file. + * + * <p>Native delete logs use a specialized schema and are routed through + * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath, long)}. + */ + private ClosableIterator<BufferedRecord<T>> createFileIterator(StoragePathInfo pathInfo, + StoragePath path, + long fileSize) throws IOException { + StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path; + if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) { + return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize); + } + Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns = + readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath); + HoodieSchema fileRequiredSchema = requiredSchemaAndRenamedColumns.getLeft(); + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } + if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || !requiredSchemaAndRenamedColumns.getRight().isEmpty()) { + UnaryOperator<T> projector = readerContext.getRecordContext() + .projectRecord(fileRequiredSchema, readerSchema, requiredSchemaAndRenamedColumns.getRight()); + recordIterator = new CloseableMappingIterator<>(recordIterator, projector); + } + if (readerContext.getInstantRange().isPresent()) { + recordIterator = readerContext.applyInstantRangeFilter(recordIterator); + } + return new CloseableMappingIterator<>(recordIterator, record -> BufferedRecords.fromEngineRecord( + readerContext.getRecordContext().seal(record), + readerSchema, + readerContext.getRecordContext(), + orderingFieldNames, + readerContext.getRecordContext().isDeleteRecord(record, readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema)))); + } + + /** + * Creates delete records from an RFC-103 native delete log. + * + * <p>The delete log schema intentionally contains only the record key and ordering value fields; + * partition path and full data columns are not read for delete-only logs. + */ + private ClosableIterator<BufferedRecord<T>> createNativeDeleteLogIterator(StoragePathInfo pathInfo, + StoragePath storagePath, + long fileSize) throws IOException { + ClosableIterator<T> recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, HoodieSchemas.DELETE_LOG_SCHEMA, HoodieSchemas.DELETE_LOG_SCHEMA, storage); + } + return new CloseableMappingIterator<>(recordIterator, record -> { + Object recordKey = readerContext.getRecordContext().getValue(record, HoodieSchemas.DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD); + return BufferedRecords.createDelete(recordKey.toString(), OrderingValues.getDefault()); Review Comment: 🤖 I think this concern is real for event-time/custom merge modes. The existing path in `BufferedRecords.fromDeleteRecord` preserves the actual ordering via `recordContext.getOrderingValue(deleteRecord)` rather than a default, and `DELETE_LOG_SCHEMA` carries `ordering_val` for exactly this reason. Hardcoding `OrderingValues.getDefault()` would make these deletes lose to inserts they should override whenever the writer persists a real event-time ordering value — worth reading it back from the delete record and passing it into `createDelete(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
