cshuo commented on code in PR #18987:
URL: https://github.com/apache/hudi/pull/18987#discussion_r3456903258


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java:
##########


Review Comment:
   Should be updated to cover the native cdc log file, `...cdc.parquet`.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge iterator for RFC-103 LSM file groups.
+ *
+ * <p>The iterator merges one optional L1/base sorted run with zero or more 
L0/native parquet log
+ * runs. Every input file must already be sorted by record key; this class 
does not sort records
+ * within a file. The active head record from each participating run is 
tracked by a loser tree,
+ * which provides efficient k-way merge behavior while preserving 
deterministic source ordering for
+ * records with the same key.
+ *
+ * <p>Merge order follows the same conflict resolution model as {@code 
HoodieFileGroupReader}: the
+ * L1/base file is processed first, and L0 log files are processed in 
file-group log order so newer
+ * log instants or versions can win when ordering values tie. Native delete 
logs contain only delete
+ * metadata and are converted into {@link BufferedRecord} delete records 
before entering the merge.
+ *
+ * <p>To reduce open-reader memory pressure when a file group has many L0 
runs, the iterator can spill
+ * selected L0 file iterators to sequential temporary files. Spilling changes 
how an input run is
+ * buffered, but not its merge order or merge semantics. The L1/base iterator 
is always kept direct,
+ * and native delete logs plus smaller L0 files are prioritized for direct 
reading.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final List<String> DELETE_LOG_ORDERING_FIELD_NAMES = 
Arrays.asList("ordering_val");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  /**
+   * Creates an iterator that merges both the L1/base file, when present, and 
all L0 native log files.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  /**
+   * Creates an iterator over an LSM file group.
+   *
+   * @param includeBaseFile whether the L1/base file should be included in the 
merge. Passing
+   *                        {@code false} produces a log-only view for callers 
that only need L0 data.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = Math.max(0, 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue()));
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  /**
+   * Builds one sorted-run reader for each sorted run that has at least one 
record.
+   *
+   * <p>The assigned {@code mergeOrder} is the stable source precedence used 
when multiple runs expose
+   * the same key. It is assigned before spill selection so direct and spilled 
iterators remain
+   * semantically identical during the loser-tree merge.
+   */
+  private List<SortedRunReader<T>> initializeReaders() throws IOException {
+    List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>();
+    int mergeOrder = 0;
+    boolean hasBaseFileReader = includeBaseFile && 
inputSplit.getBaseFileOption().isPresent();
+    if (hasBaseFileReader) {
+      addReader(sortedRunReaders, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+
+    List<LogReaderSpec> logReaderSpecs = new ArrayList<>();
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile));
+    }
+    Set<Integer> directLogMergeOrders = 
selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader);
+    for (LogReaderSpec spec : logReaderSpecs) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), 
spec.logFile.getFileSize());
+      addReader(sortedRunReaders, spec.mergeOrder, 
maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator));
+    }
+    return sortedRunReaders;
+  }
+
+  /**
+   * Selects which L0 log readers stay direct under the configured spill 
threshold.
+   *
+   * <p>The base/L1 reader consumes one direct-reader slot when present and is 
never spilled. Remaining
+   * direct-reader budget is spent on native delete logs first, then smaller 
log files, because those
+   * readers tend to be cheaper to keep open while avoiding unnecessary spill 
materialization.
+   */
+  private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs, boolean hasBaseFileReader) {
+    return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader, 
spillThreshold);
+  }
+
+  @VisibleForTesting
+  static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs,
+                                                 boolean hasBaseFileReader,
+                                                 int spillThreshold) {
+    int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0);
+    if (directLogBudget <= 0) {
+      return new HashSet<>();
+    }
+    Set<Integer> directMergeOrders = new HashSet<>();
+    logReaderSpecs.stream()
+        .sorted(Comparator
+            .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog)
+            .thenComparingLong(spec -> spec.fileSize)
+            .thenComparingInt(spec -> spec.mergeOrder))
+        .limit(directLogBudget)
+        .forEach(spec -> directMergeOrders.add(spec.mergeOrder));
+    return directMergeOrders;
+  }
+
+  /**
+   * Returns the original iterator when it is selected for direct reading, 
otherwise materializes it
+   * into a sequential spill iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean 
directReader,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (directReader) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  /**
+   * Metadata used only for choosing the direct-versus-spilled L0 reader plan.
+   */
+  @VisibleForTesting
+  static class LogReaderSpec {
+    final int mergeOrder;
+    final HoodieLogFile logFile;
+    final boolean nativeDeleteLog;
+    final long fileSize;
+
+    LogReaderSpec(int mergeOrder, HoodieLogFile logFile) {
+      this.mergeOrder = mergeOrder;
+      this.logFile = logFile;
+      this.nativeDeleteLog = 
FSUtils.isNativeDeleteLogFile(logFile.getFileName());
+      this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : 
Long.MAX_VALUE;
+    }
+  }
+
+  /**
+   * Adds a reader to the merge only when the underlying sorted run contains 
at least one record.
+   */
+  private void addReader(List<SortedRunReader<T>> sortedRunReaders, int 
mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) {
+    SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder, 
iterator);
+    if (sortedRunReader.advance()) {
+      sortedRunReaders.add(sortedRunReader);
+    } else {
+      sortedRunReader.close();
+    }
+  }
+
+  /**
+   * Creates the L1/base sorted-run iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      // Bootstrap base files require joining the skeleton file with the 
external data file.
+      // Keep that path on HoodieFileGroupReader until the LSM reader 
implements the same merge.
+      throw new UnsupportedOperationException("LSM file group reader does not 
support bootstrap base files");
+    }
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);
+    return createFileIterator(file.getPathInfo(), file.getStoragePath(), 
file.getFileSize());
+  }
+
+  /**
+   * Creates a sorted-run iterator for a parquet data file or a native parquet 
log file.
+   *
+   * <p>Native delete logs use a specialized schema and are routed through
+   * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath, 
long)}.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,

Review Comment:
   `InputSplit` filters CDC logs via `endsWith(".cdc")`, which fails for native
   parquet naming (`...cdc.parquet`), so CDC logs leak into the LSM merge and 
get
   read as data — need to switch the filter to `HoodieLogFile.isCDC()`.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge iterator for RFC-103 LSM file groups.
+ *
+ * <p>The iterator merges one optional L1/base sorted run with zero or more 
L0/native parquet log
+ * runs. Every input file must already be sorted by record key; this class 
does not sort records
+ * within a file. The active head record from each participating run is 
tracked by a loser tree,
+ * which provides efficient k-way merge behavior while preserving 
deterministic source ordering for
+ * records with the same key.
+ *
+ * <p>Merge order follows the same conflict resolution model as {@code 
HoodieFileGroupReader}: the
+ * L1/base file is processed first, and L0 log files are processed in 
file-group log order so newer
+ * log instants or versions can win when ordering values tie. Native delete 
logs contain only delete
+ * metadata and are converted into {@link BufferedRecord} delete records 
before entering the merge.
+ *
+ * <p>To reduce open-reader memory pressure when a file group has many L0 
runs, the iterator can spill
+ * selected L0 file iterators to sequential temporary files. Spilling changes 
how an input run is
+ * buffered, but not its merge order or merge semantics. The L1/base iterator 
is always kept direct,
+ * and native delete logs plus smaller L0 files are prioritized for direct 
reading.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final List<String> DELETE_LOG_ORDERING_FIELD_NAMES = 
Arrays.asList("ordering_val");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  /**
+   * Creates an iterator that merges both the L1/base file, when present, and 
all L0 native log files.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  /**
+   * Creates an iterator over an LSM file group.
+   *
+   * @param includeBaseFile whether the L1/base file should be included in the 
merge. Passing
+   *                        {@code false} produces a log-only view for callers 
that only need L0 data.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = Math.max(0, 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue()));
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  /**
+   * Builds one sorted-run reader for each sorted run that has at least one 
record.
+   *
+   * <p>The assigned {@code mergeOrder} is the stable source precedence used 
when multiple runs expose
+   * the same key. It is assigned before spill selection so direct and spilled 
iterators remain
+   * semantically identical during the loser-tree merge.
+   */
+  private List<SortedRunReader<T>> initializeReaders() throws IOException {
+    List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>();
+    int mergeOrder = 0;
+    boolean hasBaseFileReader = includeBaseFile && 
inputSplit.getBaseFileOption().isPresent();
+    if (hasBaseFileReader) {
+      addReader(sortedRunReaders, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+
+    List<LogReaderSpec> logReaderSpecs = new ArrayList<>();
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile));
+    }
+    Set<Integer> directLogMergeOrders = 
selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader);
+    for (LogReaderSpec spec : logReaderSpecs) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), 
spec.logFile.getFileSize());
+      addReader(sortedRunReaders, spec.mergeOrder, 
maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator));
+    }
+    return sortedRunReaders;
+  }
+
+  /**
+   * Selects which L0 log readers stay direct under the configured spill 
threshold.
+   *
+   * <p>The base/L1 reader consumes one direct-reader slot when present and is 
never spilled. Remaining
+   * direct-reader budget is spent on native delete logs first, then smaller 
log files, because those
+   * readers tend to be cheaper to keep open while avoiding unnecessary spill 
materialization.
+   */
+  private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs, boolean hasBaseFileReader) {
+    return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader, 
spillThreshold);
+  }
+
+  @VisibleForTesting
+  static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs,
+                                                 boolean hasBaseFileReader,
+                                                 int spillThreshold) {
+    int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0);
+    if (directLogBudget <= 0) {
+      return new HashSet<>();
+    }
+    Set<Integer> directMergeOrders = new HashSet<>();
+    logReaderSpecs.stream()
+        .sorted(Comparator
+            .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog)
+            .thenComparingLong(spec -> spec.fileSize)
+            .thenComparingInt(spec -> spec.mergeOrder))
+        .limit(directLogBudget)
+        .forEach(spec -> directMergeOrders.add(spec.mergeOrder));
+    return directMergeOrders;
+  }
+
+  /**
+   * Returns the original iterator when it is selected for direct reading, 
otherwise materializes it
+   * into a sequential spill iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean 
directReader,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (directReader) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  /**
+   * Metadata used only for choosing the direct-versus-spilled L0 reader plan.
+   */
+  @VisibleForTesting
+  static class LogReaderSpec {
+    final int mergeOrder;
+    final HoodieLogFile logFile;
+    final boolean nativeDeleteLog;
+    final long fileSize;
+
+    LogReaderSpec(int mergeOrder, HoodieLogFile logFile) {
+      this.mergeOrder = mergeOrder;
+      this.logFile = logFile;
+      this.nativeDeleteLog = 
FSUtils.isNativeDeleteLogFile(logFile.getFileName());
+      this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : 
Long.MAX_VALUE;
+    }
+  }
+
+  /**
+   * Adds a reader to the merge only when the underlying sorted run contains 
at least one record.
+   */
+  private void addReader(List<SortedRunReader<T>> sortedRunReaders, int 
mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) {
+    SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder, 
iterator);
+    if (sortedRunReader.advance()) {
+      sortedRunReaders.add(sortedRunReader);
+    } else {
+      sortedRunReader.close();
+    }
+  }
+
+  /**
+   * Creates the L1/base sorted-run iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      // Bootstrap base files require joining the skeleton file with the 
external data file.
+      // Keep that path on HoodieFileGroupReader until the LSM reader 
implements the same merge.
+      throw new UnsupportedOperationException("LSM file group reader does not 
support bootstrap base files");
+    }
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);

Review Comment:
   The .orElse is dead, this line always return`baseFile`.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Sequential disk-backed iterator for sorted LSM inputs.
+ *
+ * <p>The source iterator is drained into a length-prefixed spill file and 
closed. The resulting
+ * iterator reads the records back sequentially, which matches the loser-tree 
access pattern.
+ */
+class SpillableLsmRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final int BUFFER_SIZE = 128 * 1024;
+  private static final String SPILL_FILE_PREFIX = "hudi-lsm-";
+  private static final String SPILL_FILE_SUFFIX = ".spill";
+
+  private final CustomSerializer<BufferedRecord<T>> serializer;
+  private final RecordContext<T> recordContext;
+  private final File spillFile;
+  private final long recordCount;
+  private DataInputStream inputStream;
+  private long recordsRead;
+  private BufferedRecord<T> nextRecord;
+  private boolean closed;
+
+  SpillableLsmRecordIterator(ClosableIterator<BufferedRecord<T>> 
sourceIterator,
+                             CustomSerializer<BufferedRecord<T>> serializer,
+                             RecordContext<T> recordContext,
+                             String spillBasePath) {
+    this.serializer = serializer;
+    this.recordContext = recordContext;
+    Throwable spillFailure = null;
+    try {
+      Path spillDirectory = Paths.get(spillBasePath);
+      Files.createDirectories(spillDirectory);
+      this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX, 
SPILL_FILE_SUFFIX).toFile();
+      this.spillFile.deleteOnExit();

Review Comment:
   Entries added via `File.deleteOnExit()` are never removed from the JVM's 
shutdown list. In long-running job, like Flink streaming jobs, this accumulates 
unboundedly, and can be a slow memory leak.
   
   Since `close()` and the failure path already delete the file explicitly, 
it's safe to remove this line.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.HoodieRecordReader;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/**
+ * Record reader for RFC-103 LSM file groups backed by native parquet log 
files.
+ *
+ * <p>This reader is intentionally separate from {@code 
HoodieFileGroupReader}. Callers should use it
+ * only when the file group follows pure LSM sorted-file semantics: the 
optional base file is treated
+ * as the L1 sorted run and native parquet log files are treated as L0 sorted 
runs. Mixed legacy log
+ * file groups should continue to use {@code HoodieFileGroupReader}.
+ *
+ * <p>The reader owns file-group level setup that mirrors {@code 
HoodieFileGroupReader}: schema
+ * handling, merge properties, iterator mode, output projection, read stats, 
and update callbacks.
+ * The actual sorted k-way merge is delegated to {@link 
LsmFileGroupRecordIterator}.
+ */
+public final class HoodieLsmFileGroupReader<T> implements 
HoodieRecordReader<T> {
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieTableMetaClient metaClient;
+  private final InputSplit inputSplit;
+  private final List<String> orderingFieldNames;
+  private final HoodieStorage storage;
+  private final TypedProperties props;
+  private final ReaderParameters readerParameters;
+  private final Option<UnaryOperator<T>> outputConverter;
+  private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
+  private ClosableIterator<BufferedRecord<T>> lsmRecordIterator;
+  @Getter
+  private final HoodieReadStats readStats;
+
+  @Builder(setterPrefix = "with")
+  private HoodieLsmFileGroupReader(
+      HoodieReaderContext<T> readerContext,
+      String latestCommitTime,
+      HoodieSchema dataSchema,
+      HoodieSchema requestedSchema,
+      Option<InternalSchema> internalSchemaOpt,
+      HoodieTableMetaClient hoodieTableMetaClient,
+      TypedProperties props,
+      Option<HoodieBaseFile> baseFileOption,
+      Stream<HoodieLogFile> logFiles,
+      String partitionPath,
+      Long start,
+      Long length,
+      Boolean allowInflightInstants,
+      Boolean emitDelete,
+      Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
+
+    ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
+    ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table 
meta client is required");
+    ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit 
time is required");
+    ValidationUtils.checkArgument(dataSchema != null, "Data schema is 
required");
+    ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
+    ValidationUtils.checkArgument(props != null, "Props is required");
+    ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
+    
ValidationUtils.checkArgument(hoodieTableMetaClient.getTableConfig().getLogFileFormat()
 == HoodieFileFormat.PARQUET,
+        "LSM file group reader expects parquet log files");
+
+    if (internalSchemaOpt == null) {
+      internalSchemaOpt = Option.empty();
+    }
+    if (baseFileOption == null) {
+      baseFileOption = Option.empty();
+    }
+    if (start == null) {
+      start = 0L;
+    }
+    if (length == null) {
+      length = Long.MAX_VALUE;
+    }
+    if (allowInflightInstants == null) {
+      allowInflightInstants = false;
+    }
+    if (emitDelete == null) {
+      emitDelete = false;
+    }
+    if (fileGroupUpdateCallback == null) {
+      fileGroupUpdateCallback = Option.empty();
+    }
+
+    String tablePath = hoodieTableMetaClient.getBasePath().toString();
+    HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new 
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+    this.readerParameters = ReaderParameters.builder()
+        .shouldUseRecordPosition(false)
+        .emitDeletes(emitDelete)
+        .sortOutputs(false)
+        .inflightInstantsAllowed(allowInflightInstants)
+        .build();
+    this.inputSplit = InputSplit.builder()
+        .baseFileOption(baseFileOption)
+        .logFileStream(logFiles)
+        .partitionPath(partitionPath)
+        .start(start)
+        .length(length)
+        .build();
+
+    this.readerContext = readerContext;
+    this.fileGroupUpdateCallback = fileGroupUpdateCallback;
+    this.metaClient = hoodieTableMetaClient;
+    this.storage = storage;
+
+    readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());
+    
readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath());
+    if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) {
+      throw new IllegalArgumentException("LSM file group reader is doing log 
file merge but not reading from the start of the base file");
+    }
+    HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+    this.props = ConfigUtils.getMergeProps(props, tableConfig);
+    readerContext.initRecordMerger(props);
+    readerContext.setTablePath(tablePath);
+    readerContext.setLatestCommitTime(latestCommitTime);
+    readerContext.setShouldMergeUseRecordPosition(false);
+    
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
+    
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
+        ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient)
+        : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient));
+    this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
+    this.orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
hoodieTableMetaClient);
+    this.readStats = new HoodieReadStats();
+  }
+
+  /**
+   * Creates a buffered iterator in the requested output mode.
+   *
+   * <p>{@code includeBaseFile} controls whether the L1/base sorted run 
participates in the merge.
+   * Log-only consumers, such as compaction-style readers, pass {@code false} 
so only native parquet
+   * log files are scanned.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
getBufferedRecordIterator(IteratorMode iteratorMode,
+                                                                        
boolean includeBaseFile) throws IOException {
+    this.readerContext.setIteratorMode(iteratorMode);
+    if (lsmRecordIterator != null) {
+      lsmRecordIterator.close();
+    }
+    this.lsmRecordIterator = new LsmFileGroupRecordIterator<>(
+        readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, includeBaseFile);
+    return new HoodieLsmFileGroupReaderIterator<>(this);
+  }
+
+  @Override
+  public ClosableIterator<BufferedRecord<T>> 
getClosableBufferedRecordIterator() throws IOException {
+    return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, true);
+  }
+
+  @Override
+  public ClosableIterator<T> getClosableIterator() throws IOException {
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD,
 true), BufferedRecord::getRecord);
+  }
+
+  public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() 
throws IOException {

Review Comment:
   nit: missing `@Override`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to