This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6170ac905273 feat: Add a lsm-tree based FG reader (#18987)
6170ac905273 is described below
commit 6170ac90527344ef7e51e60b173103be3a3956ae
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 24 17:07:29 2026 +0800
feat: Add a lsm-tree based FG reader (#18987)
---
.../hudi/common/config/HoodieReaderConfig.java | 7 +
.../java/org/apache/hudi/common/fs/FSUtils.java | 95 +++-
.../apache/hudi/common/model/HoodieLogFile.java | 37 +-
.../apache/hudi/common/schema/HoodieSchemas.java | 45 ++
.../common/table/read/HoodieFileGroupReader.java | 3 +-
.../hudi/common/table/read/HoodieRecordReader.java | 71 +++
.../apache/hudi/common/table/read/InputSplit.java | 3 +-
.../table/read/lsm/HoodieLsmFileGroupReader.java | 276 +++++++++++
.../table/read/lsm/LsmFileGroupRecordIterator.java | 542 +++++++++++++++++++++
.../table/read/lsm/SpillableLsmRecordIterator.java | 173 +++++++
.../table/view/AbstractTableFileSystemView.java | 6 +-
.../hudi/common/model/TestHoodieLogFile.java | 145 ++++++
.../hudi/common/table/read/TestInputSplit.java | 59 +++
.../read/lsm/TestLsmFileGroupRecordIterator.java | 148 ++++++
.../read/lsm/TestSpillableLsmRecordIterator.java | 105 ++++
15 files changed, 1694 insertions(+), 21 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 9cbab8f4468c..577863d48856 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -59,6 +59,13 @@ public class HoodieReaderConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to use positions in the block header for
data blocks containing updates and delete blocks for merging.");
+ public static final ConfigProperty<Integer> LSM_SORT_MERGE_SPILL_THRESHOLD =
ConfigProperty
+ .key("hoodie.lsm.sort.merge.spill.threshold")
+ .defaultValue(16)
+ .markAdvanced()
+ .withDocumentation("Maximum number of sorted LSM input files to keep as
direct readers during sort merge. "
+ + "When the fan-in is larger, remaining inputs are drained to
sequential local spill files and read back during merge.");
+
public static final String REALTIME_SKIP_MERGE = "skip_merge";
public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
public static final ConfigProperty<String> MERGE_TYPE = ConfigProperty
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 1b5f3ea62e78..b2fb3130d63f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -34,7 +35,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.exception.InvalidHoodieFileNameException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
@@ -76,9 +76,13 @@ public class FSUtils {
// Log files are of this pattern -
.b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
// Archive log files are of this pattern - .commits_.archive.1_1-0-1
+ // Native log files are of this pattern -
b5068208-e1a4-11e6-bf01-fe55135034f3_1-0-1_20170101134598_1.log.parquet
+ // For native log files, the file extension is log/deletes/cdc and the
suffix is the native file format.
public static final String PATH_SEPARATOR = "/";
public static final Pattern LOG_FILE_PATTERN =
Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$");
+ public static final Pattern NATIVE_LOG_FILE_PATTERN =
+
Pattern.compile("^([^._]+)_((\\d+)-(\\d+)-(\\d+))_([^_]+)_(\\d+)\\.(log|deletes|cdc)\\.([^.]+)$");
public static final Pattern PREFIX_BY_FILE_ID_PATTERN =
Pattern.compile("^(.+)-(\\d+)");
private static final Pattern BASE_FILE_PATTERN =
Pattern.compile("[a-zA-Z0-9-]+_[a-zA-Z0-9-]+_[0-9]+\\.[a-zA-Z0-9]+");
@@ -131,6 +135,10 @@ public class FSUtils {
public static String getCommitTime(String fullFileName) {
try {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(fullFileName);
+ if (nativeLogMatcher.isPresent()) {
+ return nativeLogMatcher.get().group(6);
+ }
if (isLogFile(fullFileName)) {
return fullFileName.split("_")[1].split("\\.", 2)[0];
}
@@ -328,6 +336,10 @@ public class FSUtils {
* Get the file extension from the log file.
*/
public static String getFileExtensionFromLog(StoragePath logPath) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(logPath.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return nativeLogMatcher.get().group(8);
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(logPath.toString(), "LogFile");
@@ -336,22 +348,19 @@ public class FSUtils {
}
public static String getFileIdFromFileName(String fileName) {
- if (FSUtils.isLogFile(fileName)) {
- Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
- if (!matcher.matches()) {
- throw new InvalidHoodieFileNameException(fileName, "LogFile");
- }
- return matcher.group(1);
+ Option<Matcher> logFileMatcher = matchLogFile(fileName);
+ if (logFileMatcher.isPresent()) {
+ return logFileMatcher.get().group(1);
}
return FSUtils.getFileId(fileName);
}
public static String getFileIdFromLogPath(StoragePath path) {
- Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
- if (!matcher.matches()) {
+ Option<Matcher> logFileMatcher = matchLogFile(path.getName());
+ if (!logFileMatcher.isPresent()) {
throw new InvalidHoodiePathException(path, "LogFile");
}
- return matcher.group(1);
+ return logFileMatcher.get().group(1);
}
public static String getFileIdFromFilePath(StoragePath filePath) {
@@ -365,6 +374,10 @@ public class FSUtils {
* Get the second part of the file name in the log file. That will be the
delta commit time.
*/
public static String getDeltaCommitTimeFromLogPath(StoragePath path) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return nativeLogMatcher.get().group(6);
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -376,6 +389,10 @@ public class FSUtils {
* Get TaskPartitionId used in log-path.
*/
public static Integer getTaskPartitionIdFromLogPath(StoragePath path) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return Integer.parseInt(nativeLogMatcher.get().group(3));
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -388,6 +405,10 @@ public class FSUtils {
* Get Write-Token used in log-path.
*/
public static String getWriteTokenFromLogPath(StoragePath path) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return nativeLogMatcher.get().group(2);
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -399,6 +420,10 @@ public class FSUtils {
* Get StageId used in log-path.
*/
public static Integer getStageIdFromLogPath(StoragePath path) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return Integer.parseInt(nativeLogMatcher.get().group(4));
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -411,6 +436,10 @@ public class FSUtils {
* Get Task Attempt Id used in log-path.
*/
public static Integer getTaskAttemptIdFromLogPath(StoragePath path) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
+ if (nativeLogMatcher.isPresent()) {
+ return Integer.parseInt(nativeLogMatcher.get().group(5));
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
@@ -427,6 +456,10 @@ public class FSUtils {
}
public static int getFileVersionFromLog(String logFileName) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(logFileName);
+ if (nativeLogMatcher.isPresent()) {
+ return Integer.parseInt(nativeLogMatcher.get().group(7));
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName);
if (!matcher.matches()) {
throw new HoodieIOException("Invalid log file name: " + logFileName);
@@ -443,6 +476,9 @@ public class FSUtils {
}
public static boolean isBaseFile(StoragePath path) {
+ if (matchNativeLogFile(path.getName()).isPresent()) {
+ return false;
+ }
String extension = getFileExtension(path.getName());
if (HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension)) {
return BASE_FILE_PATTERN.matcher(path.getName()).matches();
@@ -466,6 +502,9 @@ public class FSUtils {
}
public static boolean isLogFile(String fileName) {
+ if (matchNativeLogFile(fileName).isPresent()) {
+ return true;
+ }
if (fileName.startsWith(LOG_FILE_START_WITH_CHARACTER)) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION);
@@ -473,6 +512,42 @@ public class FSUtils {
return false;
}
+ public static Option<Matcher> matchNativeLogFile(String fileName) {
+ if (StringUtils.isNullOrEmpty(fileName)) {
+ return Option.empty();
+ }
+ String actualFileName = fileName.contains(StoragePath.SEPARATOR)
+ ? fileName.substring(fileName.lastIndexOf(StoragePath.SEPARATOR) + 1)
+ : fileName;
+ Matcher matcher = NATIVE_LOG_FILE_PATTERN.matcher(actualFileName);
+ return matcher.matches() ? Option.of(matcher) : Option.empty();
+ }
+
+ public static boolean isNativeDeleteLogFile(String fileName) {
+ return matchNativeLogFile(fileName).map(matcher ->
"deletes".equals(matcher.group(8))).orElse(false);
+ }
+
+ public static boolean isCDCLogFile(String fileName) {
+ if (StringUtils.isNullOrEmpty(fileName)) {
+ return false;
+ }
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName);
+ if (nativeLogMatcher.isPresent()) {
+ return
HoodieCDCUtils.CDC_LOGFILE_SUFFIX.substring(1).equals(nativeLogMatcher.get().group(8));
+ }
+ Matcher matcher = LOG_FILE_PATTERN.matcher(getFileNameFromPath(fileName));
+ return matcher.matches() &&
HoodieCDCUtils.CDC_LOGFILE_SUFFIX.equals(matcher.group(10));
+ }
+
+ private static Option<Matcher> matchLogFile(String fileName) {
+ Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName);
+ if (nativeLogMatcher.isPresent()) {
+ return nativeLogMatcher;
+ }
+ Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
+ return matcher.matches() ? Option.of(matcher) : Option.empty();
+ }
+
public static boolean isDataFile(StoragePath path) {
return isBaseFile(path) || isLogFile(path);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index 20fb395d401e..eff4205eb2e6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,7 +19,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -30,7 +30,10 @@ import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
import java.util.regex.Matcher;
import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
@@ -49,10 +52,20 @@ public class HoodieLogFile implements Serializable {
public static final String DELTA_EXTENSION = ".log";
public static final String LOG_FILE_PREFIX = ".";
public static final Integer LOGFILE_BASE_VERSION = 1;
+ private static final Map<String, Integer> EXTENSION_PRECEDENCE;
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new
LogFileComparator();
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED
= new LogFileComparator().reversed();
+ static {
+ Map<String, Integer> extensionPrecedence = new HashMap<>();
+ extensionPrecedence.put("log", 0);
+ extensionPrecedence.put("deletes", 1); // the deletes come after logs to
ensure commit time sequence.
+ extensionPrecedence.put("cdc", 2);
+ extensionPrecedence.put("archive", 3);
+ EXTENSION_PRECEDENCE = Collections.unmodifiableMap(extensionPrecedence);
+ }
+
@Getter
@Setter
private transient StoragePathInfo pathInfo;
@@ -109,6 +122,17 @@ public class HoodieLogFile implements Serializable {
}
private void parseFieldsFromPath() {
+ Option<Matcher> nativeLogMatcherOpt =
FSUtils.matchNativeLogFile(getPath().getName());
+ if (nativeLogMatcherOpt.isPresent()) {
+ Matcher matcher = nativeLogMatcherOpt.get();
+ this.fileId = matcher.group(1);
+ this.deltaCommitTime = matcher.group(6);
+ this.fileExtension = matcher.group(8);
+ this.logVersion = Integer.parseInt(matcher.group(7));
+ this.logWriteToken = matcher.group(2);
+ this.suffix = matcher.group(9);
+ return;
+ }
Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path, "LogFile");
@@ -157,7 +181,7 @@ public class HoodieLogFile implements Serializable {
}
public boolean isCDC() {
- return getSuffix().equals(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+ return FSUtils.isCDCLogFile(getFileName());
}
public String getSuffix() {
@@ -223,8 +247,13 @@ public class HoodieLogFile implements Serializable {
int compareWriteToken =
getWriteTokenComparator().compare(o1.getLogWriteToken(), o2.getLogWriteToken());
if (compareWriteToken == 0) {
- // Compare by suffix when write token is same
- return o1.getSuffix().compareTo(o2.getSuffix());
+ int p1 = EXTENSION_PRECEDENCE.getOrDefault(o1.getFileExtension(),
Integer.MAX_VALUE);
+ int p2 = EXTENSION_PRECEDENCE.getOrDefault(o2.getFileExtension(),
Integer.MAX_VALUE);
+ if (p1 == p2) {
+ // Compare by suffix when file extension is same
+ return o1.getSuffix().compareTo(o2.getSuffix());
+ }
+ return Integer.compare(p1, p2);
}
// Compare by write token when delta-commit and log-version is same
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java
new file mode 100644
index 000000000000..bb9ce345145d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemas.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.schema.HoodieSchemaUtils.createNewSchemaField;
+
+/**
+ * Factory class for {@link HoodieSchema}.
+ */
+public class HoodieSchemas {
+ public static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+
+ public static HoodieSchema createDeleteLogSchema(HoodieSchema tableSchema,
List<String> orderingFieldNames) {
+ List<HoodieSchemaField> fields = Stream.concat(
+ Stream.of(createNewSchemaField(
+ DELETE_LOG_RECORD_KEY_FIELD,
HoodieSchema.create(HoodieSchemaType.STRING), null, null)),
+ orderingFieldNames.stream().map(orderingFieldName ->
tableSchema.getField(orderingFieldName)
+ .map(HoodieSchemaUtils::createNewSchemaField)
+ .orElseThrow(() ->
+ new IllegalArgumentException("Ordering field " +
orderingFieldName + " not found in table schema"))))
+ .collect(Collectors.toList());
+ return HoodieSchema.createRecord("hudi_delete_log_record", null, null,
fields);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 0b536b511db1..f5812ced4f47 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -51,7 +51,6 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -70,7 +69,7 @@ import java.util.stream.Stream;
* in Spark and {@code RowData} in Flink.
*/
@AllArgsConstructor
-public final class HoodieFileGroupReader<T> implements Closeable {
+public final class HoodieFileGroupReader<T> implements HoodieRecordReader<T> {
private final HoodieReaderContext<T> readerContext;
private final HoodieTableMetaClient metaClient;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
new file mode 100644
index 000000000000..5a60fd47872a
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieRecordReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Common readable surface for record readers over a Hudi file group.
+ *
+ * @param <T> The type of engine-specific record representation.
+ */
+public interface HoodieRecordReader<T> extends Closeable {
+
+ /**
+ * Returns buffered records carrying Hudi metadata needed by merge/write
paths.
+ */
+ ClosableIterator<BufferedRecord<T>> getClosableBufferedRecordIterator()
throws IOException;
+
+ /**
+ * Returns final engine-specific records.
+ */
+ ClosableIterator<T> getClosableIterator() throws IOException;
+
+ /**
+ * Returns final records wrapped as {@link HoodieRecord}s.
+ */
+ ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws
IOException;
+
+ /**
+ * Returns only record keys from the file group.
+ */
+ ClosableIterator<String> getClosableKeyIterator() throws IOException;
+
+ /**
+ * Returns records that come from log files only.
+ */
+ ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws IOException;
+
+ /**
+ * Returns read statistics collected by this reader.
+ */
+ HoodieReadStats getReadStats();
+
+ /**
+ * Notifies the reader that writing a record failed so callbacks can run
cleanup or bookkeeping.
+ *
+ * @param recordKey key of the failed record
+ */
+ void onWriteFailure(String recordKey);
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index a0f1a3d04e90..6b8761db5f7c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -77,8 +76,8 @@ public class InputSplit {
if (logFileStream != null) {
// Process Log Files (if provided)
this.logFiles = logFileStream
+ .filter(logFile -> !logFile.isCDC())
.sorted(HoodieLogFile.getLogFileComparator())
- .filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList());
this.recordIterator = Option.empty();
} else if (recordIterator != null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
new file mode 100644
index 000000000000..507cbe46c708
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.HoodieRecordReader;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/**
+ * Record reader for RFC-103 LSM file groups backed by native parquet log
files.
+ *
+ * <p>This reader is intentionally separate from {@code
HoodieFileGroupReader}. Callers should use it
+ * only when the file group follows pure LSM sorted-file semantics: the
optional base file is treated
+ * as the L1 sorted run and native parquet log files are treated as L0 sorted
runs. Mixed legacy log
+ * file groups should continue to use {@code HoodieFileGroupReader}.
+ *
+ * <p>The reader owns file-group level setup that mirrors {@code
HoodieFileGroupReader}: schema
+ * handling, merge properties, iterator mode, output projection, read stats,
and update callbacks.
+ * The actual sorted k-way merge is delegated to {@link
LsmFileGroupRecordIterator}.
+ */
+public final class HoodieLsmFileGroupReader<T> implements
HoodieRecordReader<T> {
+
+ private final HoodieReaderContext<T> readerContext;
+ private final HoodieTableMetaClient metaClient;
+ private final InputSplit inputSplit;
+ private final List<String> orderingFieldNames;
+ private final HoodieStorage storage;
+ private final TypedProperties props;
+ private final ReaderParameters readerParameters;
+ private final Option<UnaryOperator<T>> outputConverter;
+ private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
+ private ClosableIterator<BufferedRecord<T>> lsmRecordIterator;
+ @Getter
+ private final HoodieReadStats readStats;
+
+ @Builder(setterPrefix = "with")
+ private HoodieLsmFileGroupReader(
+ HoodieReaderContext<T> readerContext,
+ String latestCommitTime,
+ HoodieSchema dataSchema,
+ HoodieSchema requestedSchema,
+ Option<InternalSchema> internalSchemaOpt,
+ HoodieTableMetaClient hoodieTableMetaClient,
+ TypedProperties props,
+ Option<HoodieBaseFile> baseFileOption,
+ Stream<HoodieLogFile> logFiles,
+ String partitionPath,
+ Long start,
+ Long length,
+ Boolean allowInflightInstants,
+ Boolean emitDelete,
+ Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
+
+ ValidationUtils.checkArgument(readerContext != null, "Reader context is
required");
+ ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table
meta client is required");
+ ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit
time is required");
+ ValidationUtils.checkArgument(dataSchema != null, "Data schema is
required");
+ ValidationUtils.checkArgument(requestedSchema != null, "Requested schema
is required");
+ ValidationUtils.checkArgument(props != null, "Props is required");
+ ValidationUtils.checkArgument(partitionPath != null, "Partition path is
required");
+
ValidationUtils.checkArgument(hoodieTableMetaClient.getTableConfig().getLogFileFormat()
== HoodieFileFormat.PARQUET,
+ "LSM file group reader expects parquet log files");
+
+ if (internalSchemaOpt == null) {
+ internalSchemaOpt = Option.empty();
+ }
+ if (baseFileOption == null) {
+ baseFileOption = Option.empty();
+ }
+ if (start == null) {
+ start = 0L;
+ }
+ if (length == null) {
+ length = Long.MAX_VALUE;
+ }
+ if (allowInflightInstants == null) {
+ allowInflightInstants = false;
+ }
+ if (emitDelete == null) {
+ emitDelete = false;
+ }
+ if (fileGroupUpdateCallback == null) {
+ fileGroupUpdateCallback = Option.empty();
+ }
+
+ String tablePath = hoodieTableMetaClient.getBasePath().toString();
+ HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+ this.readerParameters = ReaderParameters.builder()
+ .shouldUseRecordPosition(false)
+ .emitDeletes(emitDelete)
+ .sortOutputs(false)
+ .inflightInstantsAllowed(allowInflightInstants)
+ .build();
+ this.inputSplit = InputSplit.builder()
+ .baseFileOption(baseFileOption)
+ .logFileStream(logFiles)
+ .partitionPath(partitionPath)
+ .start(start)
+ .length(length)
+ .build();
+
+ this.readerContext = readerContext;
+ this.fileGroupUpdateCallback = fileGroupUpdateCallback;
+ this.metaClient = hoodieTableMetaClient;
+ this.storage = storage;
+
+ readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());
+
readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath());
+ if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) {
+ throw new IllegalArgumentException("LSM file group reader is doing log
file merge but not reading from the start of the base file");
+ }
+ HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+ this.props = ConfigUtils.getMergeProps(props, tableConfig);
+ readerContext.initRecordMerger(props);
+ readerContext.setTablePath(tablePath);
+ readerContext.setLatestCommitTime(latestCommitTime);
+ readerContext.setShouldMergeUseRecordPosition(false);
+
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
+
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
+ ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, props, metaClient)
+ : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, props, metaClient));
+ this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
+ this.orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
hoodieTableMetaClient);
+ this.readStats = new HoodieReadStats();
+ }
+
+ /**
+ * Creates a buffered iterator in the requested output mode.
+ *
+ * <p>{@code includeBaseFile} controls whether the L1/base sorted run
participates in the merge.
+ * Log-only consumers, such as compaction-style readers, pass {@code false}
so only native parquet
+ * log files are scanned.
+ */
+ private ClosableIterator<BufferedRecord<T>>
getBufferedRecordIterator(IteratorMode iteratorMode,
+
boolean includeBaseFile) throws IOException {
+ this.readerContext.setIteratorMode(iteratorMode);
+ if (lsmRecordIterator != null) {
+ lsmRecordIterator.close();
+ }
+ this.lsmRecordIterator = new LsmFileGroupRecordIterator<>(
+ readerContext, storage, inputSplit, orderingFieldNames, metaClient,
props, readerParameters, readStats, fileGroupUpdateCallback, includeBaseFile);
+ return new HoodieLsmFileGroupReaderIterator<>(this);
+ }
+
+ @Override
+ public ClosableIterator<BufferedRecord<T>>
getClosableBufferedRecordIterator() throws IOException {
+ return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, true);
+ }
+
+ @Override
+ public ClosableIterator<T> getClosableIterator() throws IOException {
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD,
true), BufferedRecord::getRecord);
+ }
+
+ @Override
+ public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator()
throws IOException {
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.HOODIE_RECORD,
true),
+ bufferedRecord ->
readerContext.getRecordContext().constructFinalHoodieRecord(bufferedRecord));
+ }
+
+ @Override
+ public ClosableIterator<String> getClosableKeyIterator() throws IOException {
+ return new
CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.RECORD_KEY,
true), BufferedRecord::getRecordKey);
+ }
+
+ @Override
+ public ClosableIterator<BufferedRecord<T>> getLogRecordsOnly() throws
IOException {
+ return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, false);
+ }
+
+ boolean hasNext() {
+ return lsmRecordIterator.hasNext();
+ }
+
+ BufferedRecord<T> next() {
+ BufferedRecord<T> nextVal = lsmRecordIterator.next();
+ if (outputConverter.isPresent()) {
+ return nextVal.project(outputConverter.get());
+ }
+ return nextVal;
+ }
+
+ @Override
+ public void onWriteFailure(String recordKey) {
+ this.fileGroupUpdateCallback.ifPresent(callback ->
callback.onFailure(recordKey));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (lsmRecordIterator != null) {
+ lsmRecordIterator.close();
+ }
+ }
+
+ private static class HoodieLsmFileGroupReaderIterator<T> implements
ClosableIterator<BufferedRecord<T>> {
+ private HoodieLsmFileGroupReader<T> reader;
+
+ private HoodieLsmFileGroupReaderIterator(HoodieLsmFileGroupReader<T>
reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return reader.hasNext();
+ }
+
+ @Override
+ public BufferedRecord<T> next() {
+ return reader.next();
+ }
+
+ @Override
+ public void close() {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to close the reader", e);
+ } finally {
+ this.reader = null;
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
new file mode 100644
index 000000000000..6b162240baee
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge iterator for RFC-103 LSM file groups.
+ *
+ * <p>The iterator merges one optional L1/base sorted run with zero or more
L0/native parquet log
+ * runs. Every input file must already be sorted by record key; this class
does not sort records
+ * within a file. The active head record from each participating run is
tracked by a loser tree,
+ * which provides efficient k-way merge behavior while preserving
deterministic source ordering for
+ * records with the same key.
+ *
+ * <p>Merge order follows the same conflict resolution model as {@code
HoodieFileGroupReader}: the
+ * L1/base file is processed first, and L0 log files are processed in
file-group log order so newer
+ * log instants or versions can win when ordering values tie. Native delete
logs contain only delete
+ * metadata and are converted into {@link BufferedRecord} delete records
before entering the merge.
+ *
+ * <p>To reduce open-reader memory pressure when a file group has many L0
runs, the iterator can spill
+ * selected L0 file iterators to sequential temporary files. Spilling changes
how an input run is
+ * buffered, but not its merge order or merge semantics. The L1/base iterator
is always kept direct,
+ * and native delete logs plus smaller L0 files are prioritized for direct
reading.
+ */
+public class LsmFileGroupRecordIterator<T> implements
ClosableIterator<BufferedRecord<T>> {
+
+ private final HoodieReaderContext<T> readerContext;
+ private final HoodieStorage storage;
+ private final InputSplit inputSplit;
+ private final HoodieSchema readerSchema;
+ private final List<String> orderingFieldNames;
+ private final boolean includeBaseFile;
+ private final BufferedRecordMerger<T> bufferedRecordMerger;
+ private final UpdateProcessor<T> updateProcessor;
+ private final LoserTree<T> readers;
+ private final int spillThreshold;
+ private final String spillBasePath;
+ private BufferedRecord<T> nextRecord;
+
+ /**
+ * Creates an iterator that merges both the L1/base file, when present, and
all L0 native log files.
+ */
+ public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+ HoodieStorage storage,
+ InputSplit inputSplit,
+ List<String> orderingFieldNames,
+ HoodieTableMetaClient metaClient,
+ TypedProperties props,
+ ReaderParameters readerParameters,
+ HoodieReadStats readStats,
+ Option<BaseFileUpdateCallback<T>>
fileGroupUpdateCallback) throws IOException {
+ this(readerContext, storage, inputSplit, orderingFieldNames, metaClient,
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+ }
+
+ /**
+ * Creates an iterator over an LSM file group.
+ *
+ * @param includeBaseFile whether the L1/base file should be included in the
merge. Passing
+ * {@code false} produces a log-only view for callers
that only need L0 data.
+ */
+ public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+ HoodieStorage storage,
+ InputSplit inputSplit,
+ List<String> orderingFieldNames,
+ HoodieTableMetaClient metaClient,
+ TypedProperties props,
+ ReaderParameters readerParameters,
+ HoodieReadStats readStats,
+ Option<BaseFileUpdateCallback<T>>
fileGroupUpdateCallback,
+ boolean includeBaseFile) throws
IOException {
+ this.readerContext = readerContext;
+ this.storage = storage;
+ this.inputSplit = inputSplit;
+ this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+ this.orderingFieldNames = orderingFieldNames;
+ this.includeBaseFile = includeBaseFile;
+ this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+ readerContext, readerContext.getMergeMode(), false,
readerContext.getRecordMerger(),
+ readerSchema, readerContext.getPayloadClasses(props), props,
metaClient.getTableConfig().getPartialUpdateMode());
+ this.updateProcessor = UpdateProcessor.create(readStats, readerContext,
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+ this.spillThreshold = Math.max(0,
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(),
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue()));
+ this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(),
getDefaultSpillableMapBasePath());
+ this.readers = new LoserTree<>(initializeReaders());
+ }
+
+ /**
+ * Builds one sorted-run reader for each sorted run that has at least one
record.
+ *
+ * <p>The assigned {@code mergeOrder} is the stable source precedence used
when multiple runs expose
+ * the same key. It is assigned before spill selection so direct and spilled
iterators remain
+ * semantically identical during the loser-tree merge.
+ */
+ private List<SortedRunReader<T>> initializeReaders() throws IOException {
+ List<SortedRunReader<T>> sortedRunReaders = new ArrayList<>();
+ int mergeOrder = 0;
+ boolean hasBaseFileReader = includeBaseFile &&
inputSplit.getBaseFileOption().isPresent();
+ if (hasBaseFileReader) {
+ addReader(sortedRunReaders, mergeOrder++,
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+ }
+
+ List<LogReaderSpec> logReaderSpecs = new ArrayList<>();
+ for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+ logReaderSpecs.add(new LogReaderSpec(mergeOrder++, logFile));
+ }
+ Set<Integer> directLogMergeOrders =
selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader);
+ for (LogReaderSpec spec : logReaderSpecs) {
+ ClosableIterator<BufferedRecord<T>> iterator =
createFileIterator(spec.logFile.getPathInfo(), spec.logFile.getPath(),
spec.logFile.getFileSize());
+ addReader(sortedRunReaders, spec.mergeOrder,
maybeSpillIterator(directLogMergeOrders.contains(spec.mergeOrder), iterator));
+ }
+ return sortedRunReaders;
+ }
+
+ /**
+ * Selects which L0 log readers stay direct under the configured spill
threshold.
+ *
+ * <p>The base/L1 reader consumes one direct-reader slot when present and is
never spilled. Remaining
+ * direct-reader budget is spent on native delete logs first, then smaller
log files, because those
+ * readers tend to be cheaper to keep open while avoiding unnecessary spill
materialization.
+ */
+ private Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec>
logReaderSpecs, boolean hasBaseFileReader) {
+ return selectDirectLogMergeOrders(logReaderSpecs, hasBaseFileReader,
spillThreshold);
+ }
+
+ @VisibleForTesting
+ static Set<Integer> selectDirectLogMergeOrders(List<LogReaderSpec>
logReaderSpecs,
+ boolean hasBaseFileReader,
+ int spillThreshold) {
+ int directLogBudget = spillThreshold - (hasBaseFileReader ? 1 : 0);
+ if (directLogBudget <= 0) {
+ return new HashSet<>();
+ }
+ Set<Integer> directMergeOrders = new HashSet<>();
+ logReaderSpecs.stream()
+ .sorted(Comparator
+ .comparing((LogReaderSpec spec) -> !spec.nativeDeleteLog)
+ .thenComparingLong(spec -> spec.fileSize)
+ .thenComparingInt(spec -> spec.mergeOrder))
+ .limit(directLogBudget)
+ .forEach(spec -> directMergeOrders.add(spec.mergeOrder));
+ return directMergeOrders;
+ }
+
+ /**
+ * Returns the original iterator when it is selected for direct reading,
otherwise materializes it
+ * into a sequential spill iterator.
+ */
+ private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(boolean
directReader,
+
ClosableIterator<BufferedRecord<T>> iterator) {
+ if (directReader) {
+ return iterator;
+ }
+ return new SpillableLsmRecordIterator<>(iterator,
readerContext.getRecordSerializer(), readerContext.getRecordContext(),
spillBasePath);
+ }
+
+ /**
+ * Metadata used only for choosing the direct-versus-spilled L0 reader plan.
+ */
+ @VisibleForTesting
+ static class LogReaderSpec {
+ final int mergeOrder;
+ final HoodieLogFile logFile;
+ final boolean nativeDeleteLog;
+ final long fileSize;
+
+ LogReaderSpec(int mergeOrder, HoodieLogFile logFile) {
+ this.mergeOrder = mergeOrder;
+ this.logFile = logFile;
+ this.nativeDeleteLog =
FSUtils.isNativeDeleteLogFile(logFile.getFileName());
+ this.fileSize = logFile.getFileSize() >= 0 ? logFile.getFileSize() :
Long.MAX_VALUE;
+ }
+ }
+
+ /**
+ * Adds a reader to the merge only when the underlying sorted run contains
at least one record.
+ */
+ private void addReader(List<SortedRunReader<T>> sortedRunReaders, int
mergeOrder, ClosableIterator<BufferedRecord<T>> iterator) {
+ SortedRunReader<T> sortedRunReader = new SortedRunReader<>(mergeOrder,
iterator);
+ if (sortedRunReader.advance()) {
+ sortedRunReaders.add(sortedRunReader);
+ } else {
+ sortedRunReader.close();
+ }
+ }
+
+ /**
+ * Creates the L1/base sorted-run iterator.
+ */
+ private ClosableIterator<BufferedRecord<T>>
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ // Bootstrap base files require joining the skeleton file with the
external data file.
+ // Keep that path on HoodieFileGroupReader until the LSM reader
implements the same merge.
+ throw new UnsupportedOperationException("LSM file group reader does not
support bootstrap base files");
+ }
+ return createFileIterator(baseFile.getPathInfo(),
baseFile.getStoragePath(), baseFile.getFileSize());
+ }
+
+ /**
+ * Creates a sorted-run iterator for a parquet data file or a native parquet
log file.
+ *
+ * <p>Native delete logs use a specialized schema and are routed through
+ * {@link #createNativeDeleteLogIterator(StoragePathInfo, StoragePath,
long)}.
+ */
+ private ClosableIterator<BufferedRecord<T>>
createFileIterator(StoragePathInfo pathInfo,
+ StoragePath
path,
+ long
fileSize) throws IOException {
+ StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+ if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+ return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+ }
+ Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+ HoodieSchema fileRequiredSchema =
requiredSchemaAndRenamedColumns.getLeft();
+ ClosableIterator<T> recordIterator;
+ if (pathInfo != null) {
+ recordIterator = readerContext.getFileRecordIterator(
+ pathInfo, 0, pathInfo.getLength(),
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+ } else {
+ long length = fileSize >= 0 ? fileSize :
storage.getPathInfo(storagePath).getLength();
+ recordIterator = readerContext.getFileRecordIterator(
+ storagePath, 0, length,
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+ }
+ if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) ||
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+ UnaryOperator<T> projector = readerContext.getRecordContext()
+ .projectRecord(fileRequiredSchema, readerSchema,
requiredSchemaAndRenamedColumns.getRight());
+ recordIterator = new CloseableMappingIterator<>(recordIterator,
projector);
+ }
+ if (readerContext.getInstantRange().isPresent()) {
+ recordIterator = readerContext.applyInstantRangeFilter(recordIterator);
+ }
+ return new CloseableMappingIterator<>(recordIterator, record ->
BufferedRecords.fromEngineRecord(
+ readerContext.getRecordContext().seal(record),
+ readerSchema,
+ readerContext.getRecordContext(),
+ orderingFieldNames,
+ readerContext.getRecordContext().isDeleteRecord(record,
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema))));
+ }
+
+ /**
+ * Creates delete records from an RFC-103 native delete log.
+ *
+ * <p>The delete log schema intentionally contains only the record key and
ordering fields;
+ * partition path and full data columns are not read for delete-only logs.
+ */
+ private ClosableIterator<BufferedRecord<T>>
createNativeDeleteLogIterator(StoragePathInfo pathInfo,
+
StoragePath storagePath,
+
long fileSize) throws IOException {
+ HoodieSchema deleteLogSchema = HoodieSchemas.createDeleteLogSchema(
+ readerContext.getSchemaHandler().getTableSchema(), orderingFieldNames);
+ ClosableIterator<T> recordIterator;
+ if (pathInfo != null) {
+ recordIterator = readerContext.getFileRecordIterator(
+ pathInfo, 0, pathInfo.getLength(), deleteLogSchema, deleteLogSchema,
storage);
+ } else {
+ long length = fileSize >= 0 ? fileSize :
storage.getPathInfo(storagePath).getLength();
+ recordIterator = readerContext.getFileRecordIterator(
+ storagePath, 0, length, deleteLogSchema, deleteLogSchema, storage);
+ }
+ return new CloseableMappingIterator<>(recordIterator, record -> {
+ return createNativeDeleteRecord(readerContext, record, deleteLogSchema,
orderingFieldNames);
+ });
+ }
+
+ @VisibleForTesting
+ static <T> BufferedRecord<T> createNativeDeleteRecord(HoodieReaderContext<T>
readerContext,
+ T record,
+ HoodieSchema
deleteLogSchema,
+ List<String>
orderingFieldNames) {
+ Object recordKey = readerContext.getRecordContext()
+ .getValue(record, deleteLogSchema,
HoodieSchemas.DELETE_LOG_RECORD_KEY_FIELD);
+ // Preserve the delete log's ordering value so event-time/custom merge
modes can compare
+ // deletes against data records instead of treating every native delete as
commit-time ordered.
+ Comparable orderingValue =
+ readerContext.getRecordContext().getOrderingValue(record,
deleteLogSchema, orderingFieldNames);
+ return BufferedRecords.createDelete(recordKey.toString(), orderingValue);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextRecord != null) {
+ return true;
+ }
+ while (!readers.isEmpty()) {
+ BufferedRecord<T> mergedRecord = nextMergedRecord();
+ nextRecord = updateProcessor.processUpdate(
+ mergedRecord.getRecordKey(), null, mergedRecord,
mergedRecord.isDelete());
+ if (nextRecord != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Pops and merges all currently visible versions for the next record key.
+ */
+ private BufferedRecord<T> nextMergedRecord() {
+ BufferedRecord<T> firstRecord = readers.peekWinner();
+ String recordKey = firstRecord.getRecordKey();
+ BufferedRecord<T> mergedRecord = null;
+ while (!readers.isEmpty() &&
recordKey.equals(readers.peekWinner().getRecordKey())) {
+ mergedRecord = merge(mergedRecord, readers.popWinner());
+ }
+ return mergedRecord;
+ }
+
+ /**
+ * Applies the configured buffered-record merger so later sources can win
when ordering values tie.
+ */
+ private BufferedRecord<T> merge(BufferedRecord<T> existingRecord,
BufferedRecord<T> newRecord) {
+ if (existingRecord == null) {
+ return newRecord;
+ }
+ try {
+ return bufferedRecordMerger.deltaMerge(newRecord,
existingRecord).orElse(existingRecord);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to merge LSM records for key " +
newRecord.getRecordKey(), e);
+ }
+ }
+
+ @Override
+ public BufferedRecord<T> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ BufferedRecord<T> record = nextRecord;
+ nextRecord = null;
+ return record;
+ }
+
+ @Override
+ public void close() {
+ readers.close();
+ }
+
+ /**
+ * Loser-tree state machine for k-way merging. Each leaf keeps one active
record from
+ * one sorted input stream; {@code tree[0]} stores the current champion and
internal
+ * nodes store the loser from the corresponding tournament match.
+ */
+ @VisibleForTesting
+ static class LoserTree<T> {
+ private final List<SortedRunReader<T>> leaves;
+ private final int leafBase;
+ private final int[] tree;
+ private final int[] winners;
+
+ LoserTree(List<SortedRunReader<T>> leaves) {
+ this.leaves = leaves;
+ this.leafBase = nextPowerOfTwo(Math.max(1, leaves.size()));
+ this.tree = new int[leafBase];
+ this.winners = new int[leafBase << 1];
+ Arrays.fill(tree, -1);
+ Arrays.fill(winners, -1);
+ build();
+ }
+
+ private void build() {
+ for (int i = 0; i < leaves.size(); i++) {
+ winners[leafBase + i] = leaves.get(i).current == null ? -1 : i;
+ }
+ if (leafBase == 1) {
+ tree[0] = winners[leafBase];
+ } else {
+ for (int node = leafBase - 1; node > 0; node--) {
+ replay(node);
+ }
+ }
+ }
+
+ boolean isEmpty() {
+ return tree[0] < 0;
+ }
+
+ BufferedRecord<T> peekWinner() {
+ int winnerIndex = tree[0];
+ return winnerIndex < 0 ? null : leaves.get(winnerIndex).current;
+ }
+
+ BufferedRecord<T> popWinner() {
+ int winnerIndex = tree[0];
+ SortedRunReader<T> winner = leaves.get(winnerIndex);
+ BufferedRecord<T> record = winner.current;
+ if (!winner.advance()) {
+ winner.close();
+ }
+ update(winnerIndex);
+ return record;
+ }
+
+ private void update(int leafIndex) {
+ winners[leafBase + leafIndex] = leaves.get(leafIndex).current == null ?
-1 : leafIndex;
+ if (leafBase == 1) {
+ tree[0] = winners[leafBase];
+ return;
+ }
+ int node = (leafBase + leafIndex) >> 1;
+ while (node > 0) {
+ replay(node);
+ node >>= 1;
+ }
+ }
+
+ private void replay(int node) {
+ int left = winners[node << 1];
+ int right = winners[(node << 1) + 1];
+ if (left < 0 && right < 0) {
+ winners[node] = -1;
+ tree[node] = -1;
+ } else if (left < 0) {
+ winners[node] = right;
+ tree[node] = -1;
+ } else if (right < 0) {
+ winners[node] = left;
+ tree[node] = -1;
+ } else {
+ if (compare(left, right) <= 0) {
+ winners[node] = left;
+ tree[node] = right;
+ } else {
+ winners[node] = right;
+ tree[node] = left;
+ }
+ }
+ if (node == 1) {
+ tree[0] = winners[node];
+ }
+ }
+
+ private int compare(int leftIndex, int rightIndex) {
+ SortedRunReader<T> left = leaves.get(leftIndex);
+ SortedRunReader<T> right = leaves.get(rightIndex);
+ int keyCompare =
left.current.getRecordKey().compareTo(right.current.getRecordKey());
+ if (keyCompare != 0) {
+ return keyCompare;
+ }
+ // Process older sources first so the regular merger sees later sources
last.
+ // This preserves HoodieFileGroupReader tie semantics when ordering
values are equal:
+ // base < older log instant/version < newer log instant/version.
+ return Integer.compare(left.mergeOrder, right.mergeOrder);
+ }
+
+ private void close() {
+ leaves.forEach(SortedRunReader::close);
+ }
+
+ private static int nextPowerOfTwo(int value) {
+ int result = 1;
+ while (result < value) {
+ result <<= 1;
+ }
+ return result;
+ }
+ }
+
+ @VisibleForTesting
+ static class SortedRunReader<T> {
+ private final int mergeOrder;
+ private final ClosableIterator<BufferedRecord<T>> iterator;
+ private BufferedRecord<T> current;
+ private boolean closed;
+
+ SortedRunReader(int mergeOrder, ClosableIterator<BufferedRecord<T>>
iterator) {
+ this.mergeOrder = mergeOrder;
+ this.iterator = iterator;
+ }
+
+ boolean advance() {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ }
+ current = null;
+ return false;
+ }
+
+ void close() {
+ if (!closed) {
+ iterator.close();
+ closed = true;
+ }
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
new file mode 100644
index 000000000000..8702562024ed
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Sequential disk-backed iterator for sorted LSM inputs.
+ *
+ * <p>The source iterator is drained into a length-prefixed spill file and
closed. The resulting
+ * iterator reads the records back sequentially, which matches the loser-tree
access pattern.
+ */
+class SpillableLsmRecordIterator<T> implements
ClosableIterator<BufferedRecord<T>> {
+
+ private static final int BUFFER_SIZE = 128 * 1024;
+ private static final String SPILL_FILE_PREFIX = "hudi-lsm-";
+ private static final String SPILL_FILE_SUFFIX = ".spill";
+
+ private final CustomSerializer<BufferedRecord<T>> serializer;
+ private final RecordContext<T> recordContext;
+ private final File spillFile;
+ private final long recordCount;
+ private DataInputStream inputStream;
+ private long recordsRead;
+ private BufferedRecord<T> nextRecord;
+ private boolean closed;
+
+ SpillableLsmRecordIterator(ClosableIterator<BufferedRecord<T>>
sourceIterator,
+ CustomSerializer<BufferedRecord<T>> serializer,
+ RecordContext<T> recordContext,
+ String spillBasePath) {
+ this.serializer = serializer;
+ this.recordContext = recordContext;
+ Throwable spillFailure = null;
+ try {
+ Path spillDirectory = Paths.get(spillBasePath);
+ Files.createDirectories(spillDirectory);
+ this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX,
SPILL_FILE_SUFFIX).toFile();
+ this.recordCount = spill(sourceIterator);
+ } catch (IOException e) {
+ spillFailure = e;
+ throw new HoodieIOException("Failed to spill LSM input iterator", e);
+ } catch (RuntimeException e) {
+ spillFailure = e;
+ throw e;
+ } finally {
+ closeSourceIterator(sourceIterator, spillFailure);
+ }
+ }
+
+ private long spill(ClosableIterator<BufferedRecord<T>> sourceIterator)
throws IOException {
+ long count = 0;
+ try (DataOutputStream outputStream = new DataOutputStream(new
BufferedOutputStream(new FileOutputStream(spillFile), BUFFER_SIZE))) {
+ while (sourceIterator.hasNext()) {
+ byte[] bytes =
serializer.serialize(sourceIterator.next().toBinary(recordContext));
+ outputStream.writeInt(bytes.length);
+ outputStream.write(bytes);
+ count++;
+ }
+ } catch (IOException | RuntimeException e) {
+ deleteSpillFile();
+ throw e;
+ }
+ return count;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextRecord != null) {
+ return true;
+ }
+ if (recordsRead >= recordCount) {
+ return false;
+ }
+ try {
+ ensureInputStream();
+ int length = inputStream.readInt();
+ byte[] bytes = new byte[length];
+ inputStream.readFully(bytes);
+ nextRecord = serializer.deserialize(bytes);
+ recordsRead++;
+ return true;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read spilled LSM input iterator",
e);
+ }
+ }
+
+ @Override
+ public BufferedRecord<T> next() {
+ if (!hasNext()) {
+ throw new java.util.NoSuchElementException();
+ }
+ BufferedRecord<T> record = nextRecord;
+ nextRecord = null;
+ return record;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to close spilled LSM input
iterator", e);
+ } finally {
+ deleteSpillFile();
+ closed = true;
+ }
+ }
+
+ private void ensureInputStream() throws IOException {
+ if (inputStream == null) {
+ inputStream = new DataInputStream(new
BufferedInputStream(Files.newInputStream(spillFile.toPath()), BUFFER_SIZE));
+ }
+ }
+
+ private void deleteSpillFile() {
+ try {
+ Files.deleteIfExists(spillFile.toPath());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to delete spilled LSM input file " +
spillFile, e);
+ }
+ }
+
+ private void closeSourceIterator(ClosableIterator<BufferedRecord<T>>
sourceIterator,
+ Throwable spillFailure) {
+ try {
+ sourceIterator.close();
+ } catch (RuntimeException e) {
+ if (spillFailure != null) {
+ spillFailure.addSuppressed(e);
+ } else {
+ throw e;
+ }
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index c9468f152fbd..0c68f9ee4215 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -70,7 +70,6 @@ import
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
-import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -509,6 +508,8 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
// 2. file is not .hoodie_partition_metadata
if
(pathName.startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
return false;
+ } else if (FSUtils.isLogFile(pathInfo.getPath())) {
+ return false;
} else if (isMultipleBaseFileFormatsEnabled) {
return pathName.contains(HoodieFileFormat.PARQUET.getFileExtension())
|| pathName.contains(HoodieFileFormat.ORC.getFileExtension())
@@ -529,8 +530,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
String logFileExtension =
metaClient.getTableConfig().getLogFileFormat().getFileExtension();
Predicate<StoragePathInfo> rtFilePredicate = pathInfo -> {
String fileName = pathInfo.getPath().getName();
- Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(fileName);
- return matcher.matches() && fileName.contains(logFileExtension);
+ return FSUtils.isLogFile(pathInfo.getPath()) &&
fileName.contains(logFileExtension);
};
return
pathInfoList.stream().filter(rtFilePredicate).map(HoodieLogFile::new);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
index 616801c7fb75..3285386b8a82 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
@@ -18,12 +18,18 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.List;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieLogFile {
private final String pathStr =
"file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1";
@@ -77,6 +83,145 @@ public class TestHoodieLogFile {
assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix);
}
+ @Test
+ void createFromNativeParquetLogFile() {
+ String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_2.log.parquet";
+ StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeLogPath));
+ assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeLogPath));
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(2, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("log", hoodieLogFile.getFileExtension());
+ assertEquals("parquet", hoodieLogFile.getSuffix());
+ assertEquals("log", FSUtils.getFileExtensionFromLog(nativeLogPath));
+ assertEquals("20250409161256974",
FSUtils.getCommitTime(nativeLogPath.getName()));
+ assertEquals("20250409161256974",
FSUtils.getDeltaCommitTimeFromLogPath(nativeLogPath));
+ assertEquals(2, FSUtils.getFileVersionFromLog(nativeLogPath));
+ assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(nativeLogPath));
+ assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(nativeLogPath));
+ assertEquals(0, FSUtils.getStageIdFromLogPath(nativeLogPath));
+ assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(nativeLogPath));
+ }
+
+ @Test
+ void createFromNativeDeleteParquetLogFile() {
+ String nativeDeleteLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_3.deletes.parquet";
+ StoragePath nativeDeleteLogPath = new StoragePath(nativeDeleteLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeDeleteLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeDeleteLogPath));
+ assertTrue(FSUtils.isNativeDeleteLogFile(nativeDeleteLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeDeleteLogPath));
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(3, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("deletes", hoodieLogFile.getFileExtension());
+ assertEquals("parquet", hoodieLogFile.getSuffix());
+ assertEquals("deletes",
FSUtils.getFileExtensionFromLog(nativeDeleteLogPath));
+ }
+
+ @Test
+ void createFromNativeCdcParquetLogFile() {
+ String nativeCdcLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_4.cdc.parquet";
+ StoragePath nativeCdcLogPath = new StoragePath(nativeCdcLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeCdcLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeCdcLogPath));
+ assertFalse(FSUtils.isNativeDeleteLogFile(nativeCdcLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeCdcLogPath));
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(4, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("cdc", hoodieLogFile.getFileExtension());
+ assertEquals("parquet", hoodieLogFile.getSuffix());
+ assertEquals("cdc", FSUtils.getFileExtensionFromLog(nativeCdcLogPath));
+ assertTrue(hoodieLogFile.isCDC());
+ assertTrue(FSUtils.isCDCLogFile(nativeCdcLogPathStr));
+ }
+
+ @Test
+ void createFromNativeLanceLogFile() {
+ String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_5.log.lance";
+ StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeLogPath));
+ assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeLogPath));
+ assertFalse(hoodieLogFile.isCDC());
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(5, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("log", hoodieLogFile.getFileExtension());
+ assertEquals("lance", hoodieLogFile.getSuffix());
+ }
+
+ @Test
+ void createFromNativeCdcLanceLogFile() {
+ String nativeCdcLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_6.cdc.lance";
+ StoragePath nativeCdcLogPath = new StoragePath(nativeCdcLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeCdcLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeCdcLogPath));
+ assertFalse(FSUtils.isNativeDeleteLogFile(nativeCdcLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeCdcLogPath));
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(6, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("cdc", hoodieLogFile.getFileExtension());
+ assertEquals("lance", hoodieLogFile.getSuffix());
+ assertTrue(hoodieLogFile.isCDC());
+ assertTrue(FSUtils.isCDCLogFile(nativeCdcLogPathStr));
+ }
+
+ @Test
+ void createFromNativeLogFileWithUnknownFormatSuffix() {
+ String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/"
+ +
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_7.log.custom";
+ StoragePath nativeLogPath = new StoragePath(nativeLogPathStr);
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath);
+
+ assertTrue(FSUtils.isLogFile(nativeLogPath));
+ assertFalse(FSUtils.isNativeDeleteLogFile(nativeLogPath.getName()));
+ assertFalse(FSUtils.isBaseFile(nativeLogPath));
+ assertFalse(hoodieLogFile.isCDC());
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime());
+ assertEquals(7, hoodieLogFile.getLogVersion());
+ assertEquals("1-0-1", hoodieLogFile.getLogWriteToken());
+ assertEquals("log", hoodieLogFile.getFileExtension());
+ assertEquals("custom", hoodieLogFile.getSuffix());
+ assertFalse(FSUtils.isCDCLogFile(nativeLogPathStr));
+ }
+
+ @Test
+ void logFileComparatorOrdersNativeDeletesAfterLogsForSameVersion() {
+ HoodieLogFile logFile = new HoodieLogFile(new StoragePath(
+
"/tmp/136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_8.log.parquet"));
+ HoodieLogFile deleteFile = new HoodieLogFile(new StoragePath(
+
"/tmp/136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_8.deletes.parquet"));
+
+ List<HoodieLogFile> logFiles = Arrays.asList(deleteFile, logFile);
+ logFiles.sort(HoodieLogFile.getLogFileComparator());
+
+ assertEquals(logFile, logFiles.get(0));
+ assertEquals(deleteFile, logFiles.get(1));
+ }
+
private void assertFileGetters(StoragePathInfo pathInfo, HoodieLogFile
hoodieLogFile,
long fileLength) {
assertFileGetters(pathStr, pathInfo, hoodieLogFile, fileLength, "");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
new file mode 100644
index 000000000000..4ef72d8cf96b
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestInputSplit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.storage.StoragePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestInputSplit {
+
+ @Test
+ void filtersLegacyAndNativeCdcLogFiles() {
+ HoodieLogFile legacyLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/.file1_001.log.1_1-0-1"));
+ HoodieLogFile legacyCdcLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/.file1_001.log.2_1-0-1.cdc"));
+ HoodieLogFile nativeLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/file1_1-0-1_001_1.log.parquet"));
+ HoodieLogFile nativeCdcLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/file1_1-0-1_001_2.cdc.parquet"));
+ HoodieLogFile nativeLanceLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/file1_1-0-1_001_3.log.lance"));
+ HoodieLogFile nativeCdcLanceLogFile = new HoodieLogFile(new StoragePath(
+ "/tmp/file1_1-0-1_001_4.cdc.lance"));
+
+ InputSplit inputSplit = InputSplit.builder()
+ .logFileStream(Arrays.asList(legacyLogFile, legacyCdcLogFile,
nativeLogFile, nativeCdcLogFile, nativeLanceLogFile,
nativeCdcLanceLogFile).stream())
+ .build();
+
+ List<HoodieLogFile> logFiles = inputSplit.getLogFiles();
+ assertEquals(3, logFiles.size());
+ assertEquals(legacyLogFile, logFiles.get(0));
+ assertEquals(nativeLogFile, logFiles.get(1));
+ assertEquals(nativeLanceLogFile, logFiles.get(2));
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
new file mode 100644
index 000000000000..006f8cac207e
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestLsmFileGroupRecordIterator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemas;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.StoragePath;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestLsmFileGroupRecordIterator {
+
+ @Test
+ void testDeleteLogSchemaUsesRecordKeyAndOrderingFields() {
+ HoodieSchema deleteLogSchema =
HoodieSchemas.createDeleteLogSchema(tableSchema(), Arrays.asList("ts"));
+
+ assertEquals(Arrays.asList("record_key", "ts"),
deleteLogSchema.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList()));
+ assertEquals(HoodieSchemaType.STRING,
deleteLogSchema.getField("record_key").get().schema().getType());
+ assertEquals(HoodieSchemaType.LONG,
deleteLogSchema.getField("ts").get().schema().getType());
+ }
+
+ @Test
+ void testLoserTreeMergesByRecordKeyThenMergeOrder() {
+ List<LsmFileGroupRecordIterator.SortedRunReader<String>> readers =
Arrays.asList(
+ sortedRunReader(0, record("key1", "base-key1"), record("key3",
"base-key3")),
+ sortedRunReader(1, record("key1", "log1-key1"), record("key2",
"log1-key2")),
+ sortedRunReader(2, BufferedRecords.createDelete("key1", 3),
record("key3", "log2-key3")));
+
+ LsmFileGroupRecordIterator.LoserTree<String> loserTree = new
LsmFileGroupRecordIterator.LoserTree<>(readers);
+
+ assertEquals(Arrays.asList(
+ "key1:base-key1",
+ "key1:log1-key1",
+ "key1:DELETE",
+ "key2:log1-key2",
+ "key3:base-key3",
+ "key3:log2-key3"), drain(loserTree));
+ }
+
+ @Test
+ void testNativeDeleteRecordPreservesOrderingValue() {
+ HoodieReaderContext<Map<String, Object>> readerContext =
mock(HoodieReaderContext.class);
+ RecordContext<Map<String, Object>> recordContext =
mock(RecordContext.class);
+ Map<String, Object> record = Collections.emptyMap();
+ List<String> orderingFields = Collections.singletonList("ts");
+ HoodieSchema deleteLogSchema =
HoodieSchemas.createDeleteLogSchema(tableSchema(), orderingFields);
+
+ when(readerContext.getRecordContext()).thenReturn(recordContext);
+ when(recordContext.getValue(record, deleteLogSchema,
"record_key")).thenReturn("key1");
+ when(recordContext.getOrderingValue(eq(record), eq(deleteLogSchema),
eq(orderingFields))).thenReturn(42L);
+
+ BufferedRecord<Map<String, Object>> deleteRecord =
+ LsmFileGroupRecordIterator.createNativeDeleteRecord(readerContext,
record, deleteLogSchema, orderingFields);
+
+ assertEquals("key1", deleteRecord.getRecordKey());
+ assertEquals(42L, deleteRecord.getOrderingValue());
+ assertTrue(deleteRecord.isDelete());
+ }
+
+ @Test
+ void testSelectDirectLogReadersPrioritizesDeletesThenSmallFiles() {
+ List<LsmFileGroupRecordIterator.LogReaderSpec> logReaderSpecs =
Arrays.asList(
+ new LsmFileGroupRecordIterator.LogReaderSpec(1,
logFile("file1_1-0-1_001_1.log.parquet", 10)),
+ new LsmFileGroupRecordIterator.LogReaderSpec(2,
logFile("file1_1-0-1_002_1.deletes.parquet", 100)),
+ new LsmFileGroupRecordIterator.LogReaderSpec(3,
logFile("file1_1-0-1_003_1.log.parquet", 5)));
+
+ Set<Integer> directReadersWithBase =
+ LsmFileGroupRecordIterator.selectDirectLogMergeOrders(logReaderSpecs,
true, 3);
+ Set<Integer> directReadersWithoutBase =
+ LsmFileGroupRecordIterator.selectDirectLogMergeOrders(logReaderSpecs,
false, 2);
+
+ assertEquals(new HashSet<>(Arrays.asList(2, 3)), directReadersWithBase);
+ assertEquals(new HashSet<>(Arrays.asList(2, 3)), directReadersWithoutBase);
+ }
+
+ private static LsmFileGroupRecordIterator.SortedRunReader<String>
sortedRunReader(int mergeOrder,
+
BufferedRecord<String>... records) {
+ LsmFileGroupRecordIterator.SortedRunReader<String> reader = new
LsmFileGroupRecordIterator.SortedRunReader<>(
+ mergeOrder, ClosableIterator.wrap(Arrays.asList(records).iterator()));
+ assertTrue(reader.advance());
+ return reader;
+ }
+
+ private static BufferedRecord<String> record(String recordKey, String value)
{
+ return new BufferedRecord<>(recordKey, 1, value, null, null);
+ }
+
+ private static HoodieSchema tableSchema() {
+ return HoodieSchema.createRecord("test_record", null, null, Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("ts",
HoodieSchema.create(HoodieSchemaType.LONG))));
+ }
+
+ private static List<String>
drain(LsmFileGroupRecordIterator.LoserTree<String> loserTree) {
+ List<String> records = new ArrayList<>();
+ while (!loserTree.isEmpty()) {
+ BufferedRecord<String> record = loserTree.popWinner();
+ records.add(record.getRecordKey() + ":" + (record.isDelete() ? "DELETE"
: record.getRecord()));
+ }
+ return records;
+ }
+
+ private static HoodieLogFile logFile(String fileName, long fileSize) {
+ return new HoodieLogFile(new StoragePath("/tmp/" + fileName), fileSize);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
new file mode 100644
index 000000000000..7984435ea2b6
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.serialization.DefaultSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestSpillableLsmRecordIterator {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ void testSpillAndReadBackSequentially() throws IOException {
+ List<BufferedRecord<String>> records = Arrays.asList(
+ new BufferedRecord<>("key1", 1, null, null, null),
+ new BufferedRecord<>("key2", 2, null, null, null),
+ new BufferedRecord<>("key3", 3, null, null, null));
+
+ SpillableLsmRecordIterator<String> iterator = new
SpillableLsmRecordIterator<>(
+ ClosableIterator.wrap(records.iterator()), new DefaultSerializer<>(),
null, tempDir.toString());
+
+ assertEquals(1, spillFileCount());
+ assertTrue(iterator.hasNext());
+ assertTrue(iterator.hasNext());
+ assertEquals(records.get(0), iterator.next());
+ assertEquals(records.get(1), iterator.next());
+ assertEquals(records.get(2), iterator.next());
+ assertFalse(iterator.hasNext());
+
+ iterator.close();
+ assertEquals(0, spillFileCount());
+ }
+
+ @Test
+ void testSpillFailurePreservesSourceCloseFailureAsSuppressed() throws
IOException {
+ Path spillBaseFile = Files.createTempFile(tempDir, "spill-base", ".tmp");
+ RuntimeException closeFailure = new RuntimeException("source close
failed");
+
+ HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
new SpillableLsmRecordIterator<>(
+ closeFailingIterator(closeFailure), new DefaultSerializer<>(), null,
spillBaseFile.toString()));
+
+ assertSame(closeFailure, exception.getCause().getSuppressed()[0]);
+ }
+
+ private long spillFileCount() throws IOException {
+ try (Stream<Path> paths = Files.list(tempDir)) {
+ return paths.count();
+ }
+ }
+
+ private ClosableIterator<BufferedRecord<String>>
closeFailingIterator(RuntimeException closeFailure) {
+ return new ClosableIterator<BufferedRecord<String>>() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public BufferedRecord<String> next() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ throw closeFailure;
+ }
+ };
+ }
+}