This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6170ac905273 feat: Add a lsm-tree based FG reader (#18987)
6170ac905273 is described below

commit 6170ac90527344ef7e51e60b173103be3a3956ae
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 24 17:07:29 2026 +0800

    feat: Add a lsm-tree based FG reader (#18987)
---
 .../hudi/common/config/HoodieReaderConfig.java     |   7 +
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  95 +++-
 .../apache/hudi/common/model/HoodieLogFile.java    |  37 +-
 .../apache/hudi/common/schema/HoodieSchemas.java   |  45 ++
 .../common/table/read/HoodieFileGroupReader.java   |   3 +-
 .../hudi/common/table/read/HoodieRecordReader.java |  71 +++
 .../apache/hudi/common/table/read/InputSplit.java  |   3 +-
 .../table/read/lsm/HoodieLsmFileGroupReader.java   | 276 +++++++++++
 .../table/read/lsm/LsmFileGroupRecordIterator.java | 542 +++++++++++++++++++++
 .../table/read/lsm/SpillableLsmRecordIterator.java | 173 +++++++
 .../table/view/AbstractTableFileSystemView.java    |   6 +-
 .../hudi/common/model/TestHoodieLogFile.java       | 145 ++++++
 .../hudi/common/table/read/TestInputSplit.java     |  59 +++
 .../read/lsm/TestLsmFileGroupRecordIterator.java   | 148 ++++++
 .../read/lsm/TestSpillableLsmRecordIterator.java   | 105 ++++
 15 files changed, 1694 insertions(+), 21 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 9cbab8f4468c..577863d48856 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -59,6 +59,13 @@ public class HoodieReaderConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Whether to use positions in the block header for 
data blocks containing updates and delete blocks for merging.");
 
+  public static final ConfigProperty<Integer> LSM_SORT_MERGE_SPILL_THRESHOLD = 
ConfigProperty
+      .key("hoodie.lsm.sort.merge.spill.threshold")
+      .defaultValue(16)
+      .markAdvanced()
+      .withDocumentation("Maximum number of sorted LSM input files to keep as 
direct readers during sort merge. "
+          + "When the fan-in is larger, remaining inputs are drained to 
sequential local spill files and read back during merge.");
+
   public static final String REALTIME_SKIP_MERGE = "skip_merge";
   public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
   public static final ConfigProperty<String> MERGE_TYPE = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 1b5f3ea62e78..b2fb3130d63f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -34,7 +35,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.exception.InvalidHoodieFileNameException;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
@@ -76,9 +76,13 @@ public class FSUtils {
 
   // Log files are of this pattern - 
.b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
   // Archive log files are of this pattern - .commits_.archive.1_1-0-1
+  // Native log files are of this pattern - 
b5068208-e1a4-11e6-bf01-fe55135034f3_1-0-1_20170101134598_1.log.parquet
+  // For native log files, the file extension is log/deletes/cdc and the 
suffix is the native file format.
   public static final String PATH_SEPARATOR = "/";
   public static final Pattern LOG_FILE_PATTERN =
       
Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$");
+  public static final Pattern NATIVE_LOG_FILE_PATTERN =
+      
Pattern.compile("^([^._]+)_((\\d+)-(\\d+)-(\\d+))_([^_]+)_(\\d+)\\.(log|deletes|cdc)\\.([^.]+)$");
   public static final Pattern PREFIX_BY_FILE_ID_PATTERN = 
Pattern.compile("^(.+)-(\\d+)");
   private static final Pattern BASE_FILE_PATTERN = 
Pattern.compile("[a-zA-Z0-9-]+_[a-zA-Z0-9-]+_[0-9]+\\.[a-zA-Z0-9]+");
 
@@ -131,6 +135,10 @@ public class FSUtils {
 
   public static String getCommitTime(String fullFileName) {
     try {
+      Option<Matcher> nativeLogMatcher = matchNativeLogFile(fullFileName);
+      if (nativeLogMatcher.isPresent()) {
+        return nativeLogMatcher.get().group(6);
+      }
       if (isLogFile(fullFileName)) {
         return fullFileName.split("_")[1].split("\\.", 2)[0];
       }
@@ -328,6 +336,10 @@ public class FSUtils {
    * Get the file extension from the log file.
    */
   public static String getFileExtensionFromLog(StoragePath logPath) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(logPath.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return nativeLogMatcher.get().group(8);
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(logPath.toString(), "LogFile");
@@ -336,22 +348,19 @@ public class FSUtils {
   }
 
   public static String getFileIdFromFileName(String fileName) {
-    if (FSUtils.isLogFile(fileName)) {
-      Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
-      if (!matcher.matches()) {
-        throw new InvalidHoodieFileNameException(fileName, "LogFile");
-      }
-      return matcher.group(1);
+    Option<Matcher> logFileMatcher = matchLogFile(fileName);
+    if (logFileMatcher.isPresent()) {
+      return logFileMatcher.get().group(1);
     }
     return FSUtils.getFileId(fileName);
   }
 
   public static String getFileIdFromLogPath(StoragePath path) {
-    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
-    if (!matcher.matches()) {
+    Option<Matcher> logFileMatcher = matchLogFile(path.getName());
+    if (!logFileMatcher.isPresent()) {
       throw new InvalidHoodiePathException(path, "LogFile");
     }
-    return matcher.group(1);
+    return logFileMatcher.get().group(1);
   }
 
   public static String getFileIdFromFilePath(StoragePath filePath) {
@@ -365,6 +374,10 @@ public class FSUtils {
    * Get the second part of the file name in the log file. That will be the 
delta commit time.
    */
   public static String getDeltaCommitTimeFromLogPath(StoragePath path) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return nativeLogMatcher.get().group(6);
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -376,6 +389,10 @@ public class FSUtils {
    * Get TaskPartitionId used in log-path.
    */
   public static Integer getTaskPartitionIdFromLogPath(StoragePath path) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return Integer.parseInt(nativeLogMatcher.get().group(3));
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -388,6 +405,10 @@ public class FSUtils {
    * Get Write-Token used in log-path.
    */
   public static String getWriteTokenFromLogPath(StoragePath path) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return nativeLogMatcher.get().group(2);
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -399,6 +420,10 @@ public class FSUtils {
    * Get StageId used in log-path.
    */
   public static Integer getStageIdFromLogPath(StoragePath path) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return Integer.parseInt(nativeLogMatcher.get().group(4));
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -411,6 +436,10 @@ public class FSUtils {
    * Get Task Attempt Id used in log-path.
    */
   public static Integer getTaskAttemptIdFromLogPath(StoragePath path) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+    if (nativeLogMatcher.isPresent()) {
+      return Integer.parseInt(nativeLogMatcher.get().group(5));
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -427,6 +456,10 @@ public class FSUtils {
   }
 
   public static int getFileVersionFromLog(String logFileName) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(logFileName);
+    if (nativeLogMatcher.isPresent()) {
+      return Integer.parseInt(nativeLogMatcher.get().group(7));
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName);
     if (!matcher.matches()) {
       throw new HoodieIOException("Invalid log file name: " + logFileName);
@@ -443,6 +476,9 @@ public class FSUtils {
   }
 
   public static boolean isBaseFile(StoragePath path) {
+    if (matchNativeLogFile(path.getName()).isPresent()) {
+      return false;
+    }
     String extension = getFileExtension(path.getName());
     if (HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension)) {
       return BASE_FILE_PATTERN.matcher(path.getName()).matches();
@@ -466,6 +502,9 @@ public class FSUtils {
   }
 
   public static boolean isLogFile(String fileName) {
+    if (matchNativeLogFile(fileName).isPresent()) {
+      return true;
+    }
     if (fileName.startsWith(LOG_FILE_START_WITH_CHARACTER)) {
       Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
       return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION);
@@ -473,6 +512,42 @@ public class FSUtils {
     return false;
   }
 
+  public static Option<Matcher> matchNativeLogFile(String fileName) {
+    if (StringUtils.isNullOrEmpty(fileName)) {
+      return Option.empty();
+    }
+    String actualFileName = fileName.contains(StoragePath.SEPARATOR)
+        ? fileName.substring(fileName.lastIndexOf(StoragePath.SEPARATOR) + 1)
+        : fileName;
+    Matcher matcher = NATIVE_LOG_FILE_PATTERN.matcher(actualFileName);
+    return matcher.matches() ? Option.of(matcher) : Option.empty();
+  }
+
+  public static boolean isNativeDeleteLogFile(String fileName) {
+    return matchNativeLogFile(fileName).map(matcher -> 
"deletes".equals(matcher.group(8))).orElse(false);
+  }
+
+  public static boolean isCDCLogFile(String fileName) {
+    if (StringUtils.isNullOrEmpty(fileName)) {
+      return false;
+    }
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName);
+    if (nativeLogMatcher.isPresent()) {
+      return 
HoodieCDCUtils.CDC_LOGFILE_SUFFIX.substring(1).equals(nativeLogMatcher.get().group(8));
+    }
+    Matcher matcher = LOG_FILE_PATTERN.matcher(getFileNameFromPath(fileName));
+    return matcher.matches() && 
HoodieCDCUtils.CDC_LOGFILE_SUFFIX.equals(matcher.group(10));
+  }
+
+  private static Option<Matcher> matchLogFile(String fileName) {
+    Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName);
+    if (nativeLogMatcher.isPresent()) {
+      return nativeLogMatcher;
+    }
+    Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+    return matcher.matches() ? Option.of(matcher) : Option.empty();
+  }
+
   public static boolean isDataFile(StoragePath path) {
     return isBaseFile(path) || isLogFile(path);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index 20fb395d401e..eff4205eb2e6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -30,7 +30,10 @@ import lombok.Setter;
 import lombok.ToString;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Matcher;
 
 import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
@@ -49,10 +52,20 @@ public class HoodieLogFile implements Serializable {
   public static final String DELTA_EXTENSION = ".log";
   public static final String LOG_FILE_PREFIX = ".";
   public static final Integer LOGFILE_BASE_VERSION = 1;
+  private static final Map<String, Integer> EXTENSION_PRECEDENCE;
 
   private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new 
LogFileComparator();
   private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED 
= new LogFileComparator().reversed();
 
+  static {
+    Map<String, Integer> extensionPrecedence = new HashMap<>();
+    extensionPrecedence.put("log", 0);
+    extensionPrecedence.put("deletes", 1); // the deletes come after logs to 
ensure commit time sequence.
+    extensionPrecedence.put("cdc", 2);
+    extensionPrecedence.put("archive", 3);
+    EXTENSION_PRECEDENCE = Collections.unmodifiableMap(extensionPrecedence);
+  }
+
   @Getter
   @Setter
   private transient StoragePathInfo pathInfo;
@@ -109,6 +122,17 @@ public class HoodieLogFile implements Serializable {
   }
 
   private void parseFieldsFromPath() {
+    Option<Matcher> nativeLogMatcherOpt = 
FSUtils.matchNativeLogFile(getPath().getName());
+    if (nativeLogMatcherOpt.isPresent()) {
+      Matcher matcher = nativeLogMatcherOpt.get();
+      this.fileId = matcher.group(1);
+      this.deltaCommitTime = matcher.group(6);
+      this.fileExtension = matcher.group(8);
+      this.logVersion = Integer.parseInt(matcher.group(7));
+      this.logWriteToken = matcher.group(2);
+      this.suffix = matcher.group(9);
+      return;
+    }
     Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName());
     if (!matcher.matches()) {
       throw new InvalidHoodiePathException(path, "LogFile");
@@ -157,7 +181,7 @@ public class HoodieLogFile implements Serializable {
   }
 
   public boolean isCDC() {
-    return getSuffix().equals(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+    return FSUtils.isCDCLogFile(getFileName());
   }
 
   public String getSuffix() {
@@ -223,8 +247,13 @@ public class HoodieLogFile implements Serializable {
           int compareWriteToken = 
getWriteTokenComparator().compare(o1.getLogWriteToken(), o2.getLogWriteToken());
           if (compareWriteToken == 0) {
 
-            // Compare by suffix when write token is same
-            return o1.getSuffix().compareTo(o2.getSuffix());
+            int p1 = EXTENSION_PRECEDENCE.getOrDefault(o1.getFileExtension(), 
Integer.MAX_VALUE);
+            int p2 = EXTENSION_PRECEDENCE.getOrDefault(o2.getFileExtension(), 
Integer.MAX_VALUE);
+            if (p1 == p2) {
+              // Compare by suffix when file extension is same
+              return o1.getSuffix().compareTo(o2.getSuffix());
+            }
+            return Integer.compare(p1, p2);
           }
 
           // Compare by write token when delta-commit and log-version is same
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java
new file mode 100644
index 000000000000..bb9ce345145d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.createNewSchemaField;
+
+/**
+ * Factory class for {@link HoodieSchema}.
+ */
+public class HoodieSchemas {
+  public static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+
+  public static HoodieSchema createDeleteLogSchema(HoodieSchema tableSchema, 
List<String> orderingFieldNames) {
+    List<HoodieSchemaField> fields = Stream.concat(
+        Stream.of(createNewSchemaField(
+            DELETE_LOG_RECORD_KEY_FIELD, 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)),
+        orderingFieldNames.stream().map(orderingFieldName -> 
tableSchema.getField(orderingFieldName)
+            .map(HoodieSchemaUtils::createNewSchemaField)
+            .orElseThrow(() ->
+                new IllegalArgumentException("Ordering field " + 
orderingFieldName + " not found in table schema"))))
+        .collect(Collectors.toList());
+    return HoodieSchema.createRecord("hudi_delete_log_record", null, null, 
fields);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 0b536b511db1..f5812ced4f47 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -51,7 +51,6 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Getter;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -70,7 +69,7 @@ import java.util.stream.Stream;
  *            in Spark and {@code RowData} in Flink.
  */
 @AllArgsConstructor
-public final class HoodieFileGroupReader<T> implements Closeable {
+public final class HoodieFileGroupReader<T> implements HoodieRecordReader<T> {
 
   private final HoodieReaderContext<T> readerContext;
   private final HoodieTableMetaClient metaClient;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
new file mode 100644
index 000000000000..5a60fd47872a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Common readable surface for record readers over a Hudi file group.
+ *
+ * @param <T> The type of engine-specific record representation.
+ */
+public interface HoodieRecordReader<T> extends Closeable {
+
+  /**
+   * Returns buffered records carrying Hudi metadata needed by merge/write 
paths.
+   */
+  ClosableIterator<BufferedRecord<T>> getClosableBufferedRecordIterator() 
throws IOException;
+
+  /**
+   * Returns final engine-specific records.
+   */
+  ClosableIterator<T> getClosableIterator() throws IOException;
+
+  /**
+   * Returns final records wrapped as {@link HoodieRecord}s.
+   */
+  ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws 
IOException;
+
+  /**
+   * Returns only record keys from the file group.
+   */
+  ClosableIterator<String> getClosableKeyIterator() throws IOException;
+
+  /**
+   * Returns records that come from log files only.
+   */
+  ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws IOException;
+
+  /**
+   * Returns read statistics collected by this reader.
+   */
+  HoodieReadStats getReadStats();
+
+  /**
+   * Notifies the reader that writing a record failed so callbacks can run 
cleanup or bookkeeping.
+   *
+   * @param recordKey key of the failed record
+   */
+  void onWriteFailure(String recordKey);
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index a0f1a3d04e90..6b8761db5f7c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
@@ -77,8 +76,8 @@ public class InputSplit {
     if (logFileStream != null) {
       // Process Log Files (if provided)
       this.logFiles = logFileStream
+          .filter(logFile -> !logFile.isCDC())
           .sorted(HoodieLogFile.getLogFileComparator())
-          .filter(logFile -> 
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
           .collect(Collectors.toList());
       this.recordIterator = Option.empty();
     } else if (recordIterator != null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
new file mode 100644
index 000000000000..507cbe46c708
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.HoodieRecordReader;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/**
+ * Record reader for RFC-103 LSM file groups backed by native parquet log 
files.
+ *
+ * <p>This reader is intentionally separate from {@code 
HoodieFileGroupReader}. Callers should use it
+ * only when the file group follows pure LSM sorted-file semantics: the 
optional base file is treated
+ * as the L1 sorted run and native parquet log files are treated as L0 sorted 
runs. Mixed legacy log
+ * file groups should continue to use {@code HoodieFileGroupReader}.
+ *
+ * <p>The reader owns file-group level setup that mirrors {@code 
HoodieFileGroupReader}: schema
+ * handling, merge properties, iterator mode, output projection, read stats, 
and update callbacks.
+ * The actual sorted k-way merge is delegated to {@link 
LsmFileGroupRecordIterator}.
+ */
+public final class HoodieLsmFileGroupReader<T> implements 
HoodieRecordReader<T> {
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieTableMetaClient metaClient;
+  private final InputSplit inputSplit;
+  private final List<String> orderingFieldNames;
+  private final HoodieStorage storage;
+  private final TypedProperties props;
+  private final ReaderParameters readerParameters;
+  private final Option<UnaryOperator<T>> outputConverter;
+  private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
+  private ClosableIterator<BufferedRecord<T>> lsmRecordIterator;
+  @Getter
+  private final HoodieReadStats readStats;
+
+  @Builder(setterPrefix = "with")
+  private HoodieLsmFileGroupReader(
+      HoodieReaderContext<T> readerContext,
+      String latestCommitTime,
+      HoodieSchema dataSchema,
+      HoodieSchema requestedSchema,
+      Option<InternalSchema> internalSchemaOpt,
+      HoodieTableMetaClient hoodieTableMetaClient,
+      TypedProperties props,
+      Option<HoodieBaseFile> baseFileOption,
+      Stream<HoodieLogFile> logFiles,
+      String partitionPath,
+      Long start,
+      Long length,
+      Boolean allowInflightInstants,
+      Boolean emitDelete,
+      Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
+
+    ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
+    ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table 
meta client is required");
+    ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit 
time is required");
+    ValidationUtils.checkArgument(dataSchema != null, "Data schema is 
required");
+    ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
+    ValidationUtils.checkArgument(props != null, "Props is required");
+    ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
+    
ValidationUtils.checkArgument(hoodieTableMetaClient.getTableConfig().getLogFileFormat()
 == HoodieFileFormat.PARQUET,
+        "LSM file group reader expects parquet log files");
+
+    if (internalSchemaOpt == null) {
+      internalSchemaOpt = Option.empty();
+    }
+    if (baseFileOption == null) {
+      baseFileOption = Option.empty();
+    }
+    if (start == null) {
+      start = 0L;
+    }
+    if (length == null) {
+      length = Long.MAX_VALUE;
+    }
+    if (allowInflightInstants == null) {
+      allowInflightInstants = false;
+    }
+    if (emitDelete == null) {
+      emitDelete = false;
+    }
+    if (fileGroupUpdateCallback == null) {
+      fileGroupUpdateCallback = Option.empty();
+    }
+
+    String tablePath = hoodieTableMetaClient.getBasePath().toString();
+    HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new 
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+    this.readerParameters = ReaderParameters.builder()
+        .shouldUseRecordPosition(false)
+        .emitDeletes(emitDelete)
+        .sortOutputs(false)
+        .inflightInstantsAllowed(allowInflightInstants)
+        .build();
+    this.inputSplit = InputSplit.builder()
+        .baseFileOption(baseFileOption)
+        .logFileStream(logFiles)
+        .partitionPath(partitionPath)
+        .start(start)
+        .length(length)
+        .build();
+
+    this.readerContext = readerContext;
+    this.fileGroupUpdateCallback = fileGroupUpdateCallback;
+    this.metaClient = hoodieTableMetaClient;
+    this.storage = storage;
+
+    readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());
+    
readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath());
+    if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) {
+      throw new IllegalArgumentException("LSM file group reader is doing log 
file merge but not reading from the start of the base file");
+    }
+    HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+    this.props = ConfigUtils.getMergeProps(props, tableConfig);
+    readerContext.initRecordMerger(props);
+    readerContext.setTablePath(tablePath);
+    readerContext.setLatestCommitTime(latestCommitTime);
+    readerContext.setShouldMergeUseRecordPosition(false);
+    
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
+    
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
+        ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient)
+        : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient));
+    this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
+    this.orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
hoodieTableMetaClient);
+    this.readStats = new HoodieReadStats();
+  }
+
+  /**
+   * Creates a buffered iterator in the requested output mode.
+   *
+   * <p>{@code includeBaseFile} controls whether the L1/base sorted run 
participates in the merge.
+   * Log-only consumers, such as compaction-style readers, pass {@code false} 
so only native parquet
+   * log files are scanned.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
getBufferedRecordIterator(IteratorMode iteratorMode,
+                                                                        
boolean includeBaseFile) throws IOException {
+    this.readerContext.setIteratorMode(iteratorMode);
+    if (lsmRecordIterator != null) {
+      lsmRecordIterator.close();
+    }
+    this.lsmRecordIterator = new LsmFileGroupRecordIterator<>(
+        readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, includeBaseFile);
+    return new HoodieLsmFileGroupReaderIterator<>(this);
+  }
+
+  @Override
+  public ClosableIterator<BufferedRecord<T>> 
getClosableBufferedRecordIterator() throws IOException {
+    return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, true);
+  }
+
+  @Override
+  public ClosableIterator<T> getClosableIterator() throws IOException {
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD,
 true), BufferedRecord::getRecord);
+  }
+
+  @Override
+  public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() 
throws IOException {
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.HOODIE_RECORD,
 true),
+        bufferedRecord -> 
readerContext.getRecordContext().constructFinalHoodieRecord(bufferedRecord));
+  }
+
+  @Override
+  public ClosableIterator<String> getClosableKeyIterator() throws IOException {
+    return new 
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.RECORD_KEY, 
true), BufferedRecord::getRecordKey);
+  }
+
+  @Override
+  public ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws 
IOException {
+    return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, false);
+  }
+
+  boolean hasNext() {
+    return lsmRecordIterator.hasNext();
+  }
+
+  BufferedRecord<T> next() {
+    BufferedRecord<T> nextVal = lsmRecordIterator.next();
+    if (outputConverter.isPresent()) {
+      return nextVal.project(outputConverter.get());
+    }
+    return nextVal;
+  }
+
+  @Override
+  public void onWriteFailure(String recordKey) {
+    this.fileGroupUpdateCallback.ifPresent(callback -> 
callback.onFailure(recordKey));
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (lsmRecordIterator != null) {
+      lsmRecordIterator.close();
+    }
+  }
+
+  private static class HoodieLsmFileGroupReaderIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+    private HoodieLsmFileGroupReader<T> reader;
+
+    private HoodieLsmFileGroupReaderIterator(HoodieLsmFileGroupReader<T> 
reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return reader.hasNext();
+    }
+
+    @Override
+    public BufferedRecord<T> next() {
+      return reader.next();
+    }
+
+    @Override
+    public void close() {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to close the reader", e);
+        } finally {
+          this.reader = null;
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
new file mode 100644
index 000000000000..6b162240baee
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge iterator for RFC-103 LSM file groups.
+ *
+ * <p>The iterator merges one optional L1/base sorted run with zero or more 
L0/native parquet log
+ * runs. Every input file must already be sorted by record key; this class 
does not sort records
+ * within a file. The active head record from each participating run is 
tracked by a loser tree,
+ * which provides efficient k-way merge behavior while preserving 
deterministic source ordering for
+ * records with the same key.
+ *
+ * <p>Merge order follows the same conflict resolution model as {@code 
HoodieFileGroupReader}: the
+ * L1/base file is processed first, and L0 log files are processed in 
file-group log order so newer
+ * log instants or versions can win when ordering values tie. Native delete 
logs contain only delete
+ * metadata and are converted into {@link BufferedRecord} delete records 
before entering the merge.
+ *
+ * <p>To reduce open-reader memory pressure when a file group has many L0 
runs, the iterator can spill
+ * selected L0 file iterators to sequential temporary files. Spilling changes 
how an input run is
+ * buffered, but not its merge order or merge semantics. The L1/base iterator 
is always kept direct,
+ * and native delete logs plus smaller L0 files are prioritized for direct 
reading.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  /**
+   * Creates an iterator that merges both the L1/base file, when present, and 
all L0 native log files.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  /**
+   * Creates an iterator over an LSM file group.
+   *
+   * @param includeBaseFile whether the L1/base file should be included in the 
merge. Passing
+   *                        {@code false} produces a log-only view for callers 
that only need L0 data.
+   */
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = Math.max(0, 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue()));
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  /**
+   * Builds one sorted-run reader for each sorted run that has at least one 
record.
+   *
+   * <p>The assigned {@code mergeOrder} is the stable source precedence used 
when multiple runs expose
+   * the same key. It is assigned before spill selection so direct and spilled 
iterators remain
+   * semantically identical during the loser-tree merge.
+   */
+  private List<SortedRunReader<T>> initializeReaders() throws IOException {
+    List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>();
+    int mergeOrder = 0;
+    boolean hasBaseFileReader = includeBaseFile && 
inputSplit.getBaseFileOption().isPresent();
+    if (hasBaseFileReader) {
+      addReader(sortedRunReaders, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+
+    List<LogReaderSpec> logReaderSpecs = new ArrayList<>();
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile));
+    }
+    Set<Integer> directLogMergeOrders = 
selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader);
+    for (LogReaderSpec spec : logReaderSpecs) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(), 
spec.logFile.getFileSize());
+      addReader(sortedRunReaders, spec.mergeOrder, 
maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator));
+    }
+    return sortedRunReaders;
+  }
+
+  /**
+   * Selects which L0 log readers stay direct under the configured spill 
threshold.
+   *
+   * <p>The base/L1 reader consumes one direct-reader slot when present and is 
never spilled. Remaining
+   * direct-reader budget is spent on native delete logs first, then smaller 
log files, because those
+   * readers tend to be cheaper to keep open while avoiding unnecessary spill 
materialization.
+   */
+  private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs, boolean hasBaseFileReader) {
+    return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader, 
spillThreshold);
+  }
+
+  @VisibleForTesting
+  static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec> 
logReaderSpecs,
+                                                 boolean hasBaseFileReader,
+                                                 int spillThreshold) {
+    int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0);
+    if (directLogBudget <= 0) {
+      return new HashSet<>();
+    }
+    Set<Integer> directMergeOrders = new HashSet<>();
+    logReaderSpecs.stream()
+        .sorted(Comparator
+            .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog)
+            .thenComparingLong(spec -> spec.fileSize)
+            .thenComparingInt(spec -> spec.mergeOrder))
+        .limit(directLogBudget)
+        .forEach(spec -> directMergeOrders.add(spec.mergeOrder));
+    return directMergeOrders;
+  }
+
+  /**
+   * Returns the original iterator when it is selected for direct reading, 
otherwise materializes it
+   * into a sequential spill iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean 
directReader,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (directReader) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  /**
+   * Metadata used only for choosing the direct-versus-spilled L0 reader plan.
+   */
+  @VisibleForTesting
+  static class LogReaderSpec {
+    final int mergeOrder;
+    final HoodieLogFile logFile;
+    final boolean nativeDeleteLog;
+    final long fileSize;
+
+    LogReaderSpec(int mergeOrder, HoodieLogFile logFile) {
+      this.mergeOrder = mergeOrder;
+      this.logFile = logFile;
+      this.nativeDeleteLog = 
FSUtils.isNativeDeleteLogFile(logFile.getFileName());
+      this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() : 
Long.MAX_VALUE;
+    }
+  }
+
+  /**
+   * Adds a reader to the merge only when the underlying sorted run contains 
at least one record.
+   */
+  private void addReader(List<SortedRunReader<T>> sortedRunReaders, int 
mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) {
+    SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder, 
iterator);
+    if (sortedRunReader.advance()) {
+      sortedRunReaders.add(sortedRunReader);
+    } else {
+      sortedRunReader.close();
+    }
+  }
+
+  /**
+   * Creates the L1/base sorted-run iterator.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      // Bootstrap base files require joining the skeleton file with the 
external data file.
+      // Keep that path on HoodieFileGroupReader until the LSM reader 
implements the same merge.
+      throw new UnsupportedOperationException("LSM file group reader does not 
support bootstrap base files");
+    }
+    return createFileIterator(baseFile.getPathInfo(), 
baseFile.getStoragePath(), baseFile.getFileSize());
+  }
+
+  /**
+   * Creates a sorted-run iterator for a parquet data file or a native parquet 
log file.
+   *
+   * <p>Native delete logs use a specialized schema and are routed through
+   * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath, 
long)}.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,
+                                                                 StoragePath 
path,
+                                                                 long 
fileSize) throws IOException {
+    StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+    if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+      return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+    }
+    Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+        
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+    HoodieSchema fileRequiredSchema = 
requiredSchemaAndRenamedColumns.getLeft();
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    }
+    if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || 
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+      UnaryOperator<T> projector = readerContext.getRecordContext()
+          .projectRecord(fileRequiredSchema, readerSchema, 
requiredSchemaAndRenamedColumns.getRight());
+      recordIterator = new CloseableMappingIterator<>(recordIterator, 
projector);
+    }
+    if (readerContext.getInstantRange().isPresent()) {
+      recordIterator = readerContext.applyInstantRangeFilter(recordIterator);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> 
BufferedRecords.fromEngineRecord(
+        readerContext.getRecordContext().seal(record),
+        readerSchema,
+        readerContext.getRecordContext(),
+        orderingFieldNames,
+        readerContext.getRecordContext().isDeleteRecord(record, 
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema))));
+  }
+
+  /**
+   * Creates delete records from an RFC-103 native delete log.
+   *
+   * <p>The delete log schema intentionally contains only the record key and 
ordering fields;
+   * partition path and full data columns are not read for delete-only logs.
+   */
+  private ClosableIterator<BufferedRecord<T>> 
createNativeDeleteLogIterator(StoragePathInfo pathInfo,
+                                                                            
StoragePath storagePath,
+                                                                            
long fileSize) throws IOException {
+    HoodieSchema deleteLogSchema = HoodieSchemas.createDeleteLogSchema(
+        readerContext.getSchemaHandler().getTableSchema(), orderingFieldNames);
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), deleteLogSchema, deleteLogSchema, 
storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, deleteLogSchema, deleteLogSchema, storage);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> {
+      return createNativeDeleteRecord(readerContext, record, deleteLogSchema, 
orderingFieldNames);
+    });
+  }
+
+  @VisibleForTesting
+  static <T> BufferedRecord<T> createNativeDeleteRecord(HoodieReaderContext<T> 
readerContext,
+                                                        T record,
+                                                        HoodieSchema 
deleteLogSchema,
+                                                        List<String> 
orderingFieldNames) {
+    Object recordKey = readerContext.getRecordContext()
+        .getValue(record, deleteLogSchema, 
HoodieSchemas.DELETE_LOG_RECORD_KEY_FIELD);
+    // Preserve the delete log's ordering value so event-time/custom merge 
modes can compare
+    // deletes against data records instead of treating every native delete as 
commit-time ordered.
+    Comparable orderingValue =
+        readerContext.getRecordContext().getOrderingValue(record, 
deleteLogSchema, orderingFieldNames);
+    return BufferedRecords.createDelete(recordKey.toString(), orderingValue);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (nextRecord != null) {
+      return true;
+    }
+    while (!readers.isEmpty()) {
+      BufferedRecord<T> mergedRecord = nextMergedRecord();
+      nextRecord = updateProcessor.processUpdate(
+          mergedRecord.getRecordKey(), null, mergedRecord, 
mergedRecord.isDelete());
+      if (nextRecord != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Pops and merges all currently visible versions for the next record key.
+   */
+  private BufferedRecord<T> nextMergedRecord() {
+    BufferedRecord<T> firstRecord = readers.peekWinner();
+    String recordKey = firstRecord.getRecordKey();
+    BufferedRecord<T> mergedRecord = null;
+    while (!readers.isEmpty() && 
recordKey.equals(readers.peekWinner().getRecordKey())) {
+      mergedRecord = merge(mergedRecord, readers.popWinner());
+    }
+    return mergedRecord;
+  }
+
+  /**
+   * Applies the configured buffered-record merger so later sources can win 
when ordering values tie.
+   */
+  private BufferedRecord<T> merge(BufferedRecord<T> existingRecord, 
BufferedRecord<T> newRecord) {
+    if (existingRecord == null) {
+      return newRecord;
+    }
+    try {
+      return bufferedRecordMerger.deltaMerge(newRecord, 
existingRecord).orElse(existingRecord);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to merge LSM records for key " + 
newRecord.getRecordKey(), e);
+    }
+  }
+
+  @Override
+  public BufferedRecord<T> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    BufferedRecord<T> record = nextRecord;
+    nextRecord = null;
+    return record;
+  }
+
+  @Override
+  public void close() {
+    readers.close();
+  }
+
+  /**
+   * Loser-tree state machine for k-way merging. Each leaf keeps one active 
record from
+   * one sorted input stream; {@code tree[0]} stores the current champion and 
internal
+   * nodes store the loser from the corresponding tournament match.
+   */
+  @VisibleForTesting
+  static class LoserTree<T> {
+    private final List<SortedRunReader<T>> leaves;
+    private final int leafBase;
+    private final int[] tree;
+    private final int[] winners;
+
+    LoserTree(List<SortedRunReader<T>> leaves) {
+      this.leaves = leaves;
+      this.leafBase = nextPowerOfTwo(Math.max(1, leaves.size()));
+      this.tree = new int[leafBase];
+      this.winners = new int[leafBase << 1];
+      Arrays.fill(tree, -1);
+      Arrays.fill(winners, -1);
+      build();
+    }
+
+    private void build() {
+      for (int i = 0; i < leaves.size(); i++) {
+        winners[leafBase + i] = leaves.get(i).current == null ? -1 : i;
+      }
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+      } else {
+        for (int node = leafBase - 1; node > 0; node--) {
+          replay(node);
+        }
+      }
+    }
+
+    boolean isEmpty() {
+      return tree[0] < 0;
+    }
+
+    BufferedRecord<T> peekWinner() {
+      int winnerIndex = tree[0];
+      return winnerIndex < 0 ? null : leaves.get(winnerIndex).current;
+    }
+
+    BufferedRecord<T> popWinner() {
+      int winnerIndex = tree[0];
+      SortedRunReader<T> winner = leaves.get(winnerIndex);
+      BufferedRecord<T> record = winner.current;
+      if (!winner.advance()) {
+        winner.close();
+      }
+      update(winnerIndex);
+      return record;
+    }
+
+    private void update(int leafIndex) {
+      winners[leafBase + leafIndex] = leaves.get(leafIndex).current == null ? 
-1 : leafIndex;
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+        return;
+      }
+      int node = (leafBase + leafIndex) >> 1;
+      while (node > 0) {
+        replay(node);
+        node >>= 1;
+      }
+    }
+
+    private void replay(int node) {
+      int left = winners[node << 1];
+      int right = winners[(node << 1) + 1];
+      if (left < 0 && right < 0) {
+        winners[node] = -1;
+        tree[node] = -1;
+      } else if (left < 0) {
+        winners[node] = right;
+        tree[node] = -1;
+      } else if (right < 0) {
+        winners[node] = left;
+        tree[node] = -1;
+      } else {
+        if (compare(left, right) <= 0) {
+          winners[node] = left;
+          tree[node] = right;
+        } else {
+          winners[node] = right;
+          tree[node] = left;
+        }
+      }
+      if (node == 1) {
+        tree[0] = winners[node];
+      }
+    }
+
+    private int compare(int leftIndex, int rightIndex) {
+      SortedRunReader<T> left = leaves.get(leftIndex);
+      SortedRunReader<T> right = leaves.get(rightIndex);
+      int keyCompare = 
left.current.getRecordKey().compareTo(right.current.getRecordKey());
+      if (keyCompare != 0) {
+        return keyCompare;
+      }
+      // Process older sources first so the regular merger sees later sources 
last.
+      // This preserves HoodieFileGroupReader tie semantics when ordering 
values are equal:
+      // base < older log instant/version < newer log instant/version.
+      return Integer.compare(left.mergeOrder, right.mergeOrder);
+    }
+
+    private void close() {
+      leaves.forEach(SortedRunReader::close);
+    }
+
+    private static int nextPowerOfTwo(int value) {
+      int result = 1;
+      while (result < value) {
+        result <<= 1;
+      }
+      return result;
+    }
+  }
+
+  @VisibleForTesting
+  static class SortedRunReader<T> {
+    private final int mergeOrder;
+    private final ClosableIterator<BufferedRecord<T>> iterator;
+    private BufferedRecord<T> current;
+    private boolean closed;
+
+    SortedRunReader(int mergeOrder, ClosableIterator<BufferedRecord<T>> 
iterator) {
+      this.mergeOrder = mergeOrder;
+      this.iterator = iterator;
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      }
+      current = null;
+      return false;
+    }
+
+    void close() {
+      if (!closed) {
+        iterator.close();
+        closed = true;
+      }
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
new file mode 100644
index 000000000000..8702562024ed
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Sequential disk-backed iterator for sorted LSM inputs.
+ *
+ * <p>The source iterator is drained into a length-prefixed spill file and 
closed. The resulting
+ * iterator reads the records back sequentially, which matches the loser-tree 
access pattern.
+ */
+class SpillableLsmRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final int BUFFER_SIZE = 128 * 1024;
+  private static final String SPILL_FILE_PREFIX = "hudi-lsm-";
+  private static final String SPILL_FILE_SUFFIX = ".spill";
+
+  private final CustomSerializer<BufferedRecord<T>> serializer;
+  private final RecordContext<T> recordContext;
+  private final File spillFile;
+  private final long recordCount;
+  private DataInputStream inputStream;
+  private long recordsRead;
+  private BufferedRecord<T> nextRecord;
+  private boolean closed;
+
+  SpillableLsmRecordIterator(ClosableIterator<BufferedRecord<T>> 
sourceIterator,
+                             CustomSerializer<BufferedRecord<T>> serializer,
+                             RecordContext<T> recordContext,
+                             String spillBasePath) {
+    this.serializer = serializer;
+    this.recordContext = recordContext;
+    Throwable spillFailure = null;
+    try {
+      Path spillDirectory = Paths.get(spillBasePath);
+      Files.createDirectories(spillDirectory);
+      this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX, 
SPILL_FILE_SUFFIX).toFile();
+      this.recordCount = spill(sourceIterator);
+    } catch (IOException e) {
+      spillFailure = e;
+      throw new HoodieIOException("Failed to spill LSM input iterator", e);
+    } catch (RuntimeException e) {
+      spillFailure = e;
+      throw e;
+    } finally {
+      closeSourceIterator(sourceIterator, spillFailure);
+    }
+  }
+
+  private long spill(ClosableIterator<BufferedRecord<T>> sourceIterator) 
throws IOException {
+    long count = 0;
+    try (DataOutputStream outputStream = new DataOutputStream(new 
BufferedOutputStream(new FileOutputStream(spillFile), BUFFER_SIZE))) {
+      while (sourceIterator.hasNext()) {
+        byte[] bytes = 
serializer.serialize(sourceIterator.next().toBinary(recordContext));
+        outputStream.writeInt(bytes.length);
+        outputStream.write(bytes);
+        count++;
+      }
+    } catch (IOException | RuntimeException e) {
+      deleteSpillFile();
+      throw e;
+    }
+    return count;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (nextRecord != null) {
+      return true;
+    }
+    if (recordsRead >= recordCount) {
+      return false;
+    }
+    try {
+      ensureInputStream();
+      int length = inputStream.readInt();
+      byte[] bytes = new byte[length];
+      inputStream.readFully(bytes);
+      nextRecord = serializer.deserialize(bytes);
+      recordsRead++;
+      return true;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read spilled LSM input iterator", 
e);
+    }
+  }
+
+  @Override
+  public BufferedRecord<T> next() {
+    if (!hasNext()) {
+      throw new java.util.NoSuchElementException();
+    }
+    BufferedRecord<T> record = nextRecord;
+    nextRecord = null;
+    return record;
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      return;
+    }
+    try {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to close spilled LSM input 
iterator", e);
+    } finally {
+      deleteSpillFile();
+      closed = true;
+    }
+  }
+
+  private void ensureInputStream() throws IOException {
+    if (inputStream == null) {
+      inputStream = new DataInputStream(new 
BufferedInputStream(Files.newInputStream(spillFile.toPath()), BUFFER_SIZE));
+    }
+  }
+
+  private void deleteSpillFile() {
+    try {
+      Files.deleteIfExists(spillFile.toPath());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to delete spilled LSM input file " + 
spillFile, e);
+    }
+  }
+
+  private void closeSourceIterator(ClosableIterator<BufferedRecord<T>> 
sourceIterator,
+                                   Throwable spillFailure) {
+    try {
+      sourceIterator.close();
+    } catch (RuntimeException e) {
+      if (spillFailure != null) {
+        spillFailure.addSuppressed(e);
+      } else {
+        throw e;
+      }
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index c9468f152fbd..0c68f9ee4215 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -70,7 +70,6 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -509,6 +508,8 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       // 2. file is not .hoodie_partition_metadata
       if 
(pathName.startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) 
{
         return false;
+      } else if (FSUtils.isLogFile(pathInfo.getPath())) {
+        return false;
       } else if (isMultipleBaseFileFormatsEnabled) {
         return pathName.contains(HoodieFileFormat.PARQUET.getFileExtension())
             || pathName.contains(HoodieFileFormat.ORC.getFileExtension())
@@ -529,8 +530,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     String logFileExtension = 
metaClient.getTableConfig().getLogFileFormat().getFileExtension();
     Predicate<StoragePathInfo> rtFilePredicate = pathInfo -> {
       String fileName = pathInfo.getPath().getName();
-      Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(fileName);
-      return matcher.matches() && fileName.contains(logFileExtension);
+      return FSUtils.isLogFile(pathInfo.getPath()) && 
fileName.contains(logFileExtension);
     };
     return 
pathInfoList.stream().filter(rtFilePredicate).map(HoodieLogFile::new);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
index 616801c7fb75..3285386b8a82 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
@@ -18,12 +18,18 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieLogFile {
   private final String pathStr = 
"file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1";
@@ -77,6 +83,145 @@ public class TestHoodieLogFile {
     assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix);
   }
 
+  @Test
+  void createFromNativeParquetLogFile() {
+    String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_2.log.parquet";
+    StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeLogPath));
+    assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeLogPath));
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(2, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("log", hoodieLogFile.getFileExtension());
+    assertEquals("parquet", hoodieLogFile.getSuffix());
+    assertEquals("log", FSUtils.getFileExtensionFromLog(nativeLogPath));
+    assertEquals("20250409161256974", 
FSUtils.getCommitTime(nativeLogPath.getName()));
+    assertEquals("20250409161256974", 
FSUtils.getDeltaCommitTimeFromLogPath(nativeLogPath));
+    assertEquals(2, FSUtils.getFileVersionFromLog(nativeLogPath));
+    assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(nativeLogPath));
+    assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(nativeLogPath));
+    assertEquals(0, FSUtils.getStageIdFromLogPath(nativeLogPath));
+    assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(nativeLogPath));
+  }
+
+  @Test
+  void createFromNativeDeleteParquetLogFile() {
+    String nativeDeleteLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_3.deletes.parquet";
+    StoragePath nativeDeleteLogPath = new StoragePath(nativeDeleteLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeDeleteLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeDeleteLogPath));
+    assertTrue(FSUtils.isNativeDeleteLogFile(nativeDeleteLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeDeleteLogPath));
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(3, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("deletes", hoodieLogFile.getFileExtension());
+    assertEquals("parquet", hoodieLogFile.getSuffix());
+    assertEquals("deletes", 
FSUtils.getFileExtensionFromLog(nativeDeleteLogPath));
+  }
+
+  @Test
+  void createFromNativeCdcParquetLogFile() {
+    String nativeCdcLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_4.cdc.parquet";
+    StoragePath nativeCdcLogPath = new StoragePath(nativeCdcLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeCdcLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeCdcLogPath));
+    assertFalse(FSUtils.isNativeDeleteLogFile(nativeCdcLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeCdcLogPath));
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(4, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("cdc", hoodieLogFile.getFileExtension());
+    assertEquals("parquet", hoodieLogFile.getSuffix());
+    assertEquals("cdc", FSUtils.getFileExtensionFromLog(nativeCdcLogPath));
+    assertTrue(hoodieLogFile.isCDC());
+    assertTrue(FSUtils.isCDCLogFile(nativeCdcLogPathStr));
+  }
+
+  @Test
+  void createFromNativeLanceLogFile() {
+    String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_5.log.lance";
+    StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeLogPath));
+    assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeLogPath));
+    assertFalse(hoodieLogFile.isCDC());
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(5, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("log", hoodieLogFile.getFileExtension());
+    assertEquals("lance", hoodieLogFile.getSuffix());
+  }
+
+  @Test
+  void createFromNativeCdcLanceLogFile() {
+    String nativeCdcLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_6.cdc.lance";
+    StoragePath nativeCdcLogPath = new StoragePath(nativeCdcLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeCdcLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeCdcLogPath));
+    assertFalse(FSUtils.isNativeDeleteLogFile(nativeCdcLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeCdcLogPath));
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(6, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("cdc", hoodieLogFile.getFileExtension());
+    assertEquals("lance", hoodieLogFile.getSuffix());
+    assertTrue(hoodieLogFile.isCDC());
+    assertTrue(FSUtils.isCDCLogFile(nativeCdcLogPathStr));
+  }
+
+  @Test
+  void createFromNativeLogFileWithUnknownFormatSuffix() {
+    String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+        + 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_7.log.custom";
+    StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+    assertTrue(FSUtils.isLogFile(nativeLogPath));
+    assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+    assertFalse(FSUtils.isBaseFile(nativeLogPath));
+    assertFalse(hoodieLogFile.isCDC());
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+    assertEquals(7, hoodieLogFile.getLogVersion());
+    assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+    assertEquals("log", hoodieLogFile.getFileExtension());
+    assertEquals("custom", hoodieLogFile.getSuffix());
+    assertFalse(FSUtils.isCDCLogFile(nativeLogPathStr));
+  }
+
+  @Test
+  void logFileComparatorOrdersNativeDeletesAfterLogsForSameVersion() {
+    HoodieLogFile logFile = new HoodieLogFile(new StoragePath(
+        
"/tmp/136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_8.log.parquet"));
+    HoodieLogFile deleteFile = new HoodieLogFile(new StoragePath(
+        
"/tmp/136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_8.deletes.parquet"));
+
+    List<HoodieLogFile> logFiles = Arrays.asList(deleteFile, logFile);
+    logFiles.sort(HoodieLogFile.getLogFileComparator());
+
+    assertEquals(logFile, logFiles.get(0));
+    assertEquals(deleteFile, logFiles.get(1));
+  }
+
   private void assertFileGetters(StoragePathInfo pathInfo, HoodieLogFile 
hoodieLogFile,
                                  long fileLength) {
     assertFileGetters(pathStr, pathInfo, hoodieLogFile, fileLength, "");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
new file mode 100644
index 000000000000..4ef72d8cf96b
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.storage.StoragePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestInputSplit {
+
+  @Test
+  void filtersLegacyAndNativeCdcLogFiles() {
+    HoodieLogFile legacyLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/.file1_001.log.1_1-0-1"));
+    HoodieLogFile legacyCdcLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/.file1_001.log.2_1-0-1.cdc"));
+    HoodieLogFile nativeLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/file1_1-0-1_001_1.log.parquet"));
+    HoodieLogFile nativeCdcLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/file1_1-0-1_001_2.cdc.parquet"));
+    HoodieLogFile nativeLanceLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/file1_1-0-1_001_3.log.lance"));
+    HoodieLogFile nativeCdcLanceLogFile = new HoodieLogFile(new StoragePath(
+        "/tmp/file1_1-0-1_001_4.cdc.lance"));
+
+    InputSplit inputSplit = InputSplit.builder()
+        .logFileStream(Arrays.asList(legacyLogFile, legacyCdcLogFile, 
nativeLogFile, nativeCdcLogFile, nativeLanceLogFile, 
nativeCdcLanceLogFile).stream())
+        .build();
+
+    List<HoodieLogFile> logFiles = inputSplit.getLogFiles();
+    assertEquals(3, logFiles.size());
+    assertEquals(legacyLogFile, logFiles.get(0));
+    assertEquals(nativeLogFile, logFiles.get(1));
+    assertEquals(nativeLanceLogFile, logFiles.get(2));
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
new file mode 100644
index 000000000000..006f8cac207e
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.StoragePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestLsmFileGroupRecordIterator {
+
+  @Test
+  void testDeleteLogSchemaUsesRecordKeyAndOrderingFields() {
+    HoodieSchema deleteLogSchema = 
HoodieSchemas.createDeleteLogSchema(tableSchema(), Arrays.asList("ts"));
+
+    assertEquals(Arrays.asList("record_key", "ts"), 
deleteLogSchema.getFields().stream()
+        .map(HoodieSchemaField::name)
+        .collect(Collectors.toList()));
+    assertEquals(HoodieSchemaType.STRING, 
deleteLogSchema.getField("record_key").get().schema().getType());
+    assertEquals(HoodieSchemaType.LONG, 
deleteLogSchema.getField("ts").get().schema().getType());
+  }
+
+  @Test
+  void testLoserTreeMergesByRecordKeyThenMergeOrder() {
+    List<LsmFileGroupRecordIterator.SortedRunReader<String>> readers = 
Arrays.asList(
+        sortedRunReader(0, record("key1", "base-key1"), record("key3", 
"base-key3")),
+        sortedRunReader(1, record("key1", "log1-key1"), record("key2", 
"log1-key2")),
+        sortedRunReader(2, BufferedRecords.createDelete("key1", 3), 
record("key3", "log2-key3")));
+
+    LsmFileGroupRecordIterator.LoserTree<String> loserTree = new 
LsmFileGroupRecordIterator.LoserTree<>(readers);
+
+    assertEquals(Arrays.asList(
+        "key1:base-key1",
+        "key1:log1-key1",
+        "key1:DELETE",
+        "key2:log1-key2",
+        "key3:base-key3",
+        "key3:log2-key3"), drain(loserTree));
+  }
+
+  @Test
+  void testNativeDeleteRecordPreservesOrderingValue() {
+    HoodieReaderContext<Map<String, Object>> readerContext = 
mock(HoodieReaderContext.class);
+    RecordContext<Map<String, Object>> recordContext = 
mock(RecordContext.class);
+    Map<String, Object> record = Collections.emptyMap();
+    List<String> orderingFields = Collections.singletonList("ts");
+    HoodieSchema deleteLogSchema = 
HoodieSchemas.createDeleteLogSchema(tableSchema(), orderingFields);
+
+    when(readerContext.getRecordContext()).thenReturn(recordContext);
+    when(recordContext.getValue(record, deleteLogSchema, 
"record_key")).thenReturn("key1");
+    when(recordContext.getOrderingValue(eq(record), eq(deleteLogSchema), 
eq(orderingFields))).thenReturn(42L);
+
+    BufferedRecord<Map<String, Object>> deleteRecord =
+        LsmFileGroupRecordIterator.createNativeDeleteRecord(readerContext, 
record, deleteLogSchema, orderingFields);
+
+    assertEquals("key1", deleteRecord.getRecordKey());
+    assertEquals(42L, deleteRecord.getOrderingValue());
+    assertTrue(deleteRecord.isDelete());
+  }
+
+  @Test
+  void testSelectDirectLogReadersPrioritizesDeletesThenSmallFiles() {
+    List<LsmFileGroupRecordIterator.LogReaderSpec> logReaderSpecs = 
Arrays.asList(
+        new LsmFileGroupRecordIterator.LogReaderSpec(1, 
logFile("file1_1-0-1_001_1.log.parquet", 10)),
+        new LsmFileGroupRecordIterator.LogReaderSpec(2, 
logFile("file1_1-0-1_002_1.deletes.parquet", 100)),
+        new LsmFileGroupRecordIterator.LogReaderSpec(3, 
logFile("file1_1-0-1_003_1.log.parquet", 5)));
+
+    Set<Integer> directReadersWithBase =
+        LsmFileGroupRecordIterator.selectDirectLogMergeOrders(logReaderSpecs, 
true, 3);
+    Set<Integer> directReadersWithoutBase =
+        LsmFileGroupRecordIterator.selectDirectLogMergeOrders(logReaderSpecs, 
false, 2);
+
+    assertEquals(new HashSet<>(Arrays.asList(2, 3)), directReadersWithBase);
+    assertEquals(new HashSet<>(Arrays.asList(2, 3)), directReadersWithoutBase);
+  }
+
+  private static LsmFileGroupRecordIterator.SortedRunReader<String> 
sortedRunReader(int mergeOrder,
+                                                                               
     BufferedRecord<String>... records) {
+    LsmFileGroupRecordIterator.SortedRunReader<String> reader = new 
LsmFileGroupRecordIterator.SortedRunReader<>(
+        mergeOrder, ClosableIterator.wrap(Arrays.asList(records).iterator()));
+    assertTrue(reader.advance());
+    return reader;
+  }
+
+  private static BufferedRecord<String> record(String recordKey, String value) 
{
+    return new BufferedRecord<>(recordKey, 1, value, null, null);
+  }
+
+  private static HoodieSchema tableSchema() {
+    return HoodieSchema.createRecord("test_record", null, null, Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("ts", 
HoodieSchema.create(HoodieSchemaType.LONG))));
+  }
+
+  private static List<String> 
drain(LsmFileGroupRecordIterator.LoserTree<String> loserTree) {
+    List<String> records = new ArrayList<>();
+    while (!loserTree.isEmpty()) {
+      BufferedRecord<String> record = loserTree.popWinner();
+      records.add(record.getRecordKey() + ":" + (record.isDelete() ? "DELETE" 
: record.getRecord()));
+    }
+    return records;
+  }
+
+  private static HoodieLogFile logFile(String fileName, long fileSize) {
+    return new HoodieLogFile(new StoragePath("/tmp/" + fileName), fileSize);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
new file mode 100644
index 000000000000..7984435ea2b6
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.serialization.DefaultSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestSpillableLsmRecordIterator {
+
+  @TempDir
+  Path tempDir;
+
+  @Test
+  void testSpillAndReadBackSequentially() throws IOException {
+    List<BufferedRecord<String>> records = Arrays.asList(
+        new BufferedRecord<>("key1", 1, null, null, null),
+        new BufferedRecord<>("key2", 2, null, null, null),
+        new BufferedRecord<>("key3", 3, null, null, null));
+
+    SpillableLsmRecordIterator<String> iterator = new 
SpillableLsmRecordIterator<>(
+        ClosableIterator.wrap(records.iterator()), new DefaultSerializer<>(), 
null, tempDir.toString());
+
+    assertEquals(1, spillFileCount());
+    assertTrue(iterator.hasNext());
+    assertTrue(iterator.hasNext());
+    assertEquals(records.get(0), iterator.next());
+    assertEquals(records.get(1), iterator.next());
+    assertEquals(records.get(2), iterator.next());
+    assertFalse(iterator.hasNext());
+
+    iterator.close();
+    assertEquals(0, spillFileCount());
+  }
+
+  @Test
+  void testSpillFailurePreservesSourceCloseFailureAsSuppressed() throws 
IOException {
+    Path spillBaseFile = Files.createTempFile(tempDir, "spill-base", ".tmp");
+    RuntimeException closeFailure = new RuntimeException("source close 
failed");
+
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () -> 
new SpillableLsmRecordIterator<>(
+        closeFailingIterator(closeFailure), new DefaultSerializer<>(), null, 
spillBaseFile.toString()));
+
+    assertSame(closeFailure, exception.getCause().getSuppressed()[0]);
+  }
+
+  private long spillFileCount() throws IOException {
+    try (Stream<Path> paths = Files.list(tempDir)) {
+      return paths.count();
+    }
+  }
+
+  private ClosableIterator<BufferedRecord<String>> 
closeFailingIterator(RuntimeException closeFailure) {
+    return new ClosableIterator<BufferedRecord<String>>() {
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+
+      @Override
+      public BufferedRecord<String> next() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void close() {
+        throw closeFailure;
+      }
+    };
+  }
+}


Reply via email to