Copilot commented on code in PR #7426:
URL: https://github.com/apache/paimon/pull/7426#discussion_r2937846997


##########
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFiles.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.clustering;
+
+import org.apache.paimon.io.DataFileMeta;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flat file management for clustering key value table. All files are treated 
as a flat list of
+ * sorted runs without level distinction. Unsorted files (level 0) are new 
writes pending
+ * compaction; sorted files (level 1) have been compacted and sorted by 
clustering columns.
+ */
+@ThreadSafe
+public class ClusteringFiles {
+
+    /** Unsorted files (level 0): new writes that have not been sorted yet. */
+    private final List<DataFileMeta> unsortedFiles = new ArrayList<>();
+
+    /** Sorted files (level 1): files that have been sorted by clustering 
columns. */
+    private final List<DataFileMeta> sortedFiles = new ArrayList<>();
+
+    private final Map<Integer, DataFileMeta> idToFileMap = new HashMap<>();
+    private final Map<String, Integer> fileNameToIdMap = new HashMap<>();
+
+    private int currentFileId = 0;
+
+    public synchronized void addNewFile(DataFileMeta file) {
+        if (file.level() == 0) {
+            unsortedFiles.add(file);
+        } else {
+            sortedFiles.add(file);
+            registerFileId(file);
+        }
+    }
+
+    private void registerFileId(DataFileMeta file) {
+        if (fileNameToIdMap.containsKey(file.fileName())) {
+            return;
+        }
+        idToFileMap.put(currentFileId, file);
+        fileNameToIdMap.put(file.fileName(), currentFileId);
+        currentFileId++;
+    }
+
+    public synchronized void removeFile(DataFileMeta file) {
+        if (file.level() == 0) {
+            unsortedFiles.remove(file);
+        } else {
+            sortedFiles.remove(file);
+        }
+    }
+
+    public synchronized DataFileMeta getFileById(int fileId) {
+        return idToFileMap.get(fileId);
+    }
+
+    public synchronized int getFileIdByName(String fileName) {
+        return fileNameToIdMap.get(fileName);
+    }
+
+    public synchronized List<DataFileMeta> allFiles() {
+        List<DataFileMeta> result = new ArrayList<>(unsortedFiles.size() + 
sortedFiles.size());
+        result.addAll(unsortedFiles);
+        result.addAll(sortedFiles);
+        return result;
+    }
+
+    /** Returns a snapshot of the unsorted files list. */
+    public synchronized List<DataFileMeta> unsortedFiles() {
+        return new ArrayList<>(unsortedFiles);
+    }
+
+    /**
+     * Returns a snapshot of the sorted files list, ordered from newest to 
oldest (insertion order).

Review Comment:
   The Javadoc says the returned list is "ordered from newest to oldest 
(insertion order)", but insertion order here is oldest-to-newest (`addNewFile` 
appends). Please clarify/correct the ordering guarantee to avoid incorrect 
assumptions by callers.
   



##########
paimon-core/src/main/java/org/apache/paimon/io/KeyValueClusteringFileWriter.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.KeyValueThinSerializer;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileIndexWriter.FileIndexResult;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
+
+/**
+ * File writer for clustering mode. Different from the normal KeyValue file 
writer, in this class
+ * minKey and maxKey store the clustering field instead of the primary key 
field.
+ */
+public class KeyValueClusteringFileWriter
+        extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueClusteringFileWriter.class);
+
+    protected final RowType keyType;
+    protected final RowType valueType;
+    private final long schemaId;
+    private final int level;
+
+    private final SimpleStatsConverter keyStatsConverter;
+    private final boolean thinMode;
+    private final boolean isExternalPath;
+    private final SimpleStatsConverter valueStatsConverter;
+    private final Projection clusteringProjection;
+    @Nullable private final DataFileIndexWriter dataFileIndexWriter;
+    private final int[] keyStatMapping;
+
+    private BinaryRow minClusteringFields = null;
+    private BinaryRow currentClusteringFields = null;
+    private long minSeqNumber = Long.MAX_VALUE;
+    private long maxSeqNumber = Long.MIN_VALUE;
+    private long deleteRecordCount = 0;
+
+    public KeyValueClusteringFileWriter(
+            FileIO fileIO,
+            FileWriterContext context,
+            Path path,
+            RowType keyType,
+            RowType valueType,
+            long schemaId,
+            int level,
+            boolean thinMode,
+            CoreOptions options,
+            FileIndexOptions fileIndexOptions,
+            boolean isExternalPath) {
+        super(
+                fileIO,
+                context,
+                path,
+                thinMode
+                        ? new KeyValueThinSerializer(keyType, valueType)::toRow
+                        : new KeyValueSerializer(keyType, valueType)::toRow,
+                KeyValue.schema(RowType.of(), valueType),
+                options.asyncFileWrite());
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.schemaId = schemaId;
+        this.level = level;
+
+        this.keyStatsConverter = new SimpleStatsConverter(keyType);
+        this.thinMode = thinMode;
+        this.isExternalPath = isExternalPath;
+        this.valueStatsConverter = new SimpleStatsConverter(valueType, 
options.statsDenseStore());
+        this.clusteringProjection =
+                CodeGenUtils.newProjection(valueType, 
options.clusteringColumns());
+        this.dataFileIndexWriter =
+                DataFileIndexWriter.create(
+                        fileIO, dataFileToFileIndexPath(path), valueType, 
fileIndexOptions);
+
+        Map<Integer, Integer> idToIndex = new 
HashMap<>(valueType.getFieldCount());
+        for (int i = 0; i < valueType.getFieldCount(); i++) {
+            idToIndex.put(valueType.getFields().get(i).id(), i);
+        }
+        this.keyStatMapping = new int[keyType.getFieldCount()];
+        for (int i = 0; i < keyType.getFieldCount(); i++) {
+            keyStatMapping[i] =
+                    idToIndex.get(
+                            keyType.getFields().get(i).id() - 
SpecialFields.KEY_FIELD_ID_START);
+        }
+    }
+
+    @Override
+    public void write(KeyValue kv) throws IOException {
+        super.write(kv);
+
+        if (dataFileIndexWriter != null) {
+            dataFileIndexWriter.write(kv.value());
+        }
+
+        currentClusteringFields = clusteringProjection.apply(kv.value());
+        if (minClusteringFields == null) {
+            minClusteringFields = currentClusteringFields.copy();
+        }
+
+        updateMinSeqNumber(kv);
+        updateMaxSeqNumber(kv);
+
+        if (kv.valueKind().isRetract()) {
+            deleteRecordCount++;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Write to Path {} key value {}", path, 
kv.toString(keyType, valueType));
+        }
+    }
+
+    private void updateMinSeqNumber(KeyValue kv) {
+        minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber());
+    }
+
+    private void updateMaxSeqNumber(KeyValue kv) {
+        maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber());
+    }
+
+    @Override
+    @Nullable
+    public DataFileMeta result() throws IOException {
+        if (recordCount() == 0) {
+            return null;
+        }
+
+        long fileSize = outputBytes();
+        SimpleColStats[] rowStats = fieldStats(fileSize);
+        int offset = thinMode ? 2 : keyType.getFieldCount() + 2;
+        SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, offset, 
rowStats.length);
+        SimpleColStats[] keyColStats = new 
SimpleColStats[keyType.getFieldCount()];
+        for (int i = 0; i < keyStatMapping.length; i++) {
+            keyColStats[i] = valFieldStats[keyStatMapping[i]];
+        }
+
+        SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyColStats);
+        Pair<List<String>, SimpleStats> valueStatsPair =
+                valueStatsConverter.toBinary(valFieldStats);
+
+        FileIndexResult indexResult =
+                dataFileIndexWriter == null
+                        ? DataFileIndexWriter.EMPTY_RESULT
+                        : dataFileIndexWriter.result();
+
+        String externalPath = isExternalPath ? path.toString() : null;
+        return DataFileMeta.create(
+                path.getName(),
+                fileSize,
+                recordCount(),
+                minClusteringFields,
+                currentClusteringFields,

Review Comment:
   `DataFileMeta` stores `minKey`/`maxKey` and expects them to be stable. 
`minClusteringFields` is copied, but `currentClusteringFields` is passed 
directly as `maxKey`; if the generated `Projection` reuses its `BinaryRow` 
instance, this can make `maxKey` mutable/incorrect. Consider storing a 
defensive copy for the max clustering key as well.
   



##########
paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java:
##########
@@ -136,4 +156,15 @@ public static int decodeInt(DataInput is) throws 
IOException {
         }
         throw new Error("Malformed integer.");
     }
+
+    public static int decodeInt(InputStream is) throws IOException {
+        for (int offset = 0, result = 0; offset < 32; offset += 7) {
+            int b = is.read() & 0xFF;
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }

Review Comment:
   `decodeInt(InputStream)` masks `is.read()` with `& 0xFF` but does not handle 
EOF (`read()` returning -1). This can mis-decode truncated streams as 0xFF 
bytes and either return an incorrect value or throw a misleading "Malformed 
integer" error. Consider explicitly checking for `-1` and throwing an 
`EOFException` (or `IOException`) when the stream ends prematurely.



##########
docs/content/primary-key-table/pk-clustering-override.md:
##########
@@ -0,0 +1,108 @@
+---
+title: "PK Clustering Override"
+weight: 10
+type: docs
+aliases:
+- /primary-key-table/pk-clustering-override.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# PK Clustering Override
+
+By default, data files in a primary key table are physically sorted by the 
primary key. This is optimal for point
+lookups but can hurt scan performance when queries filter on non-primary-key 
columns.
+
+**PK Clustering Override** mode changes the physical sort order of data files 
from the primary key to user-specified
+clustering columns. This significantly improves scan performance for queries 
that filter or group by clustering columns,
+while still maintaining primary key uniqueness through deletion vectors.
+
+## Quick Start
+
+```sql
+CREATE TABLE my_table (
+    id BIGINT,
+    dt STRING,
+    city STRING,
+    amount DOUBLE,
+    PRIMARY KEY (id) NOT ENFORCED
+) WITH (
+    'pk-clustering-override' = 'true',
+    'clustering.columns' = 'city',
+    'deletion-vectors.enabled' = 'true',
+    'bucket' = '4'
+);
+```
+
+After this, data files within each bucket will be physically sorted by `city` 
instead of `id`. Queries like
+`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files 
by checking their min/max statistics
+on the clustering column.
+
+## How It Works
+
+PK Clustering Override replaces the default LSM compaction with a two-phase 
clustering compaction:
+
+**Phase 1 — Sort by Clustering Columns**: Newly flushed (level 0) files are 
read, sorted by the configured clustering
+columns, and rewritten as sorted (level 1) files. A key index tracks each 
primary key's file and row position to
+maintain uniqueness.
+
+**Phase 2 — Merge Overlapping Sections**: Sorted files are grouped into 
sections based on clustering column range
+overlap. Overlapping sections are merged together. Adjacent small sections are 
also consolidated to reduce file count
+and IO amplification. Non-overlapping large files are left untouched.
+
+During both phases, deduplication is handled via deletion vectors:
+
+- **Deduplicate mode**: When a key already exists in an older file, the old 
row is marked as deleted.
+- **First-row mode**: When a key already exists, the new row is marked as 
deleted, keeping the first-seen value.
+
+When the number of files to merge exceeds `sort-spill-threshold`, smaller 
files are first spilled to row-based
+temporary files to reduce memory consumption, preventing OOM during multi-way 
merge.
+
+## Requirements
+
+| Option | Requirement |
+|--------|-------------|
+| `pk-clustering-override` | `true` |
+| `clustering.columns` | Must be set (one or more non-primary-key columns) |
+| `deletion-vectors.enabled` | Must be `true` |

Review Comment:
   The markdown tables use `||` at the start of each row (e.g., `|| Option | 
Requirement |`), which won’t render as a proper table in standard Markdown. 
These should be single-pipe table rows (e.g., `| Option | Requirement |`) for 
both the Requirements and Related Options sections.



##########
paimon-core/src/main/java/org/apache/paimon/io/KeyValueClusteringFileWriter.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.KeyValueThinSerializer;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileIndexWriter.FileIndexResult;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
+
+/**
+ * File writer for clustering mode. Different from the normal KeyValue file 
writer, in this class
+ * minKey and maxKey store the clustering field instead of the primary key 
field.
+ */
+public class KeyValueClusteringFileWriter
+        extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueClusteringFileWriter.class);
+
+    protected final RowType keyType;
+    protected final RowType valueType;
+    private final long schemaId;
+    private final int level;
+
+    private final SimpleStatsConverter keyStatsConverter;
+    private final boolean thinMode;
+    private final boolean isExternalPath;
+    private final SimpleStatsConverter valueStatsConverter;
+    private final Projection clusteringProjection;
+    @Nullable private final DataFileIndexWriter dataFileIndexWriter;
+    private final int[] keyStatMapping;
+
+    private BinaryRow minClusteringFields = null;
+    private BinaryRow currentClusteringFields = null;
+    private long minSeqNumber = Long.MAX_VALUE;
+    private long maxSeqNumber = Long.MIN_VALUE;
+    private long deleteRecordCount = 0;
+
+    public KeyValueClusteringFileWriter(
+            FileIO fileIO,
+            FileWriterContext context,
+            Path path,
+            RowType keyType,
+            RowType valueType,
+            long schemaId,
+            int level,
+            boolean thinMode,
+            CoreOptions options,
+            FileIndexOptions fileIndexOptions,
+            boolean isExternalPath) {
+        super(
+                fileIO,
+                context,
+                path,
+                thinMode
+                        ? new KeyValueThinSerializer(keyType, valueType)::toRow
+                        : new KeyValueSerializer(keyType, valueType)::toRow,
+                KeyValue.schema(RowType.of(), valueType),

Review Comment:
   The `rowType` passed to `StatsCollectingSingleFileWriter` is always 
`KeyValue.schema(RowType.of(), valueType)`. In non-thin mode, this does not 
match the actual rows produced by `KeyValueSerializer` (which include key 
fields), and will break `fieldStats(...)` / `result()` when stats are disabled 
(wrong stats array length / offsets). Consider passing the same write schema 
used by `KeyValueFileWriterFactory` (i.e., `KeyValue.schema(thinMode ? 
RowType.of() : keyType, valueType)`).
   



##########
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java:
##########
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.clustering;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compact.CompactDeletionFile;
+import org.apache.paimon.compact.CompactFutureManager;
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.ChannelReaderInputView;
+import org.apache.paimon.disk.ChannelReaderInputViewIterator;
+import org.apache.paimon.disk.ChannelWithMeta;
+import org.apache.paimon.disk.ChannelWriterOutputView;
+import org.apache.paimon.disk.FileChannelUtil;
+import org.apache.paimon.disk.FileIOChannel;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/**
+ * Key Value clustering compact manager for {@link KeyValueFileStore}.
+ *
+ * <p>Compaction is triggered when unsorted files exist. The compaction 
process has two phases:
+ *
+ * <ol>
+ *   <li><b>Phase 1</b>: Sort and rewrite all unsorted (level 0) files by 
clustering columns.
+ *   <li><b>Phase 2</b>: Merge sorted files based on clustering column key 
range overlap. Files are
+ *       grouped into sections where each section contains overlapping files. 
Adjacent sections are
+ *       merged when beneficial (overlapping files or small sections) to 
reduce IO amplification
+ *       while consolidating small files.
+ * </ol>
+ */
+public class ClusteringCompactManager extends CompactFutureManager {
+
+    private final RowType keyType;
+    private final RowType valueType;
+    private final long sortSpillBufferSize;
+    private final int pageSize;
+    private final int maxNumFileHandles;
+    private final int spillThreshold;
+    private final CompressOptions compression;
+    private final int[] clusteringColumns;
+    private final RecordComparator clusteringComparatorAlone;
+    private final RecordComparator clusteringComparatorInValue;
+    private final IOManager ioManager;
+    private final KeyValueFileReaderFactory keyReaderFactory;
+    private final KeyValueFileReaderFactory valueReaderFactory;
+    private final KeyValueFileWriterFactory writerFactory;
+    private final ExecutorService executor;
+    private final BucketedDvMaintainer dvMaintainer;
+    private final SimpleLsmKvDb kvDb;
+    private final boolean lazyGenDeletionFile;
+    private final boolean firstRow;
+    @Nullable private final CompactionMetrics.Reporter metricsReporter;
+
+    private final ClusteringFiles fileLevels;
+    private final long targetFileSize;
+
+    public ClusteringCompactManager(
+            RowType keyType,
+            RowType valueType,
+            List<String> clusteringColumns,
+            IOManager ioManager,
+            CacheManager cacheManager,
+            KeyValueFileReaderFactory keyReaderFactory,
+            KeyValueFileReaderFactory valueReaderFactory,
+            KeyValueFileWriterFactory writerFactory,
+            ExecutorService executor,
+            BucketedDvMaintainer dvMaintainer,
+            boolean lazyGenDeletionFile,
+            List<DataFileMeta> restoreFiles,
+            long targetFileSize,
+            long sortSpillBufferSize,
+            int pageSize,
+            int maxNumFileHandles,
+            int spillThreshold,
+            CompressOptions compression,
+            boolean firstRow,
+            @Nullable CompactionMetrics.Reporter metricsReporter) {
+        this.targetFileSize = targetFileSize;
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.sortSpillBufferSize = sortSpillBufferSize;
+        this.pageSize = pageSize;
+        this.maxNumFileHandles = maxNumFileHandles;
+        this.spillThreshold = spillThreshold;
+        this.compression = compression;
+        this.firstRow = firstRow;
+        this.clusteringColumns = valueType.projectIndexes(clusteringColumns);
+        this.clusteringComparatorAlone =
+                CodeGenUtils.newRecordComparator(
+                        valueType.project(clusteringColumns).getFieldTypes(),
+                        IntStream.range(0, clusteringColumns.size()).toArray(),
+                        true);
+        this.clusteringComparatorInValue =
+                CodeGenUtils.newRecordComparator(
+                        valueType.getFieldTypes(), this.clusteringColumns, 
true);
+        this.ioManager = ioManager;
+        this.keyReaderFactory = keyReaderFactory;
+        this.valueReaderFactory = valueReaderFactory;
+        this.writerFactory = writerFactory;
+        this.executor = executor;
+        this.dvMaintainer = dvMaintainer;
+        this.lazyGenDeletionFile = lazyGenDeletionFile;
+        this.metricsReporter = metricsReporter;
+        this.fileLevels = new ClusteringFiles();
+        restoreFiles.forEach(this::addNewFile);
+
+        this.kvDb =
+                SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir()))
+                        .cacheManager(cacheManager)
+                        .keyComparator(new 
RowCompactedSerializer(keyType).createSliceComparator())
+                        .build();
+        bootstrapKeyIndex(restoreFiles);
+    }
+
+    private void bootstrapKeyIndex(List<DataFileMeta> restoreFiles) {
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+        for (DataFileMeta file : restoreFiles) {
+            if (file.level() == 0) {
+                continue;
+            }
+            int fileId = fileLevels.getFileIdByName(file.fileName());
+            // Read with DV (auto-skips deleted rows). Use 
FileRecordIterator.returnedPosition()
+            // to get correct physical positions even after DV filtering.
+            try (RecordReader<KeyValue> reader = 
keyReaderFactory.createRecordReader(file)) {
+                FileRecordIterator<KeyValue> batch;
+                while ((batch = (FileRecordIterator<KeyValue>) 
reader.readBatch()) != null) {
+                    KeyValue kv;
+                    while ((kv = batch.next()) != null) {
+                        int position = (int) batch.returnedPosition();
+                        byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
+                        ByteArrayOutputStream value = new 
ByteArrayOutputStream(8);
+                        encodeInt(value, fileId);
+                        encodeInt(value, position);
+                        kvDb.put(keyBytes, value.toByteArray());
+                    }
+                    batch.releaseBatch();
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file) 
throws IOException {
+        //noinspection resource
+        return keyReaderFactory
+                .createRecordReader(file)
+                .transform(KeyValue::key)
+                .toCloseableIterator();
+    }
+
+    @Override
+    public boolean shouldWaitForLatestCompaction() {
+        return false;
+    }
+
+    @Override
+    public boolean shouldWaitForPreparingCheckpoint() {
+        return false;
+    }
+
+    @Override
+    public void addNewFile(DataFileMeta file) {
+        fileLevels.addNewFile(file);
+    }
+
+    @Override
+    public List<DataFileMeta> allFiles() {
+        return fileLevels.allFiles();
+    }
+
+    @Override
+    public void triggerCompaction(boolean fullCompaction) {
+        taskFuture =
+                executor.submit(
+                        new CompactTask(metricsReporter) {
+                            @Override
+                            protected CompactResult doCompact() throws 
Exception {
+                                return compact(fullCompaction);
+                            }
+                        });
+    }
+
+    private CompactResult compact(boolean fullCompaction) throws Exception {
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+        KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
+        RowType kvSchemaType = KeyValue.schema(keyType, valueType);
+
+        CompactResult result = new CompactResult();
+
+        // Phase 1: Sort and rewrite all unsorted (level 0) files
+        List<DataFileMeta> unsortedFiles = fileLevels.unsortedFiles();
+        // Snapshot sorted files before Phase 1 to avoid including newly 
created files in Phase 2
+        List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
+        for (DataFileMeta file : unsortedFiles) {
+            List<DataFileMeta> sortedFiles =
+                    sortAndRewriteFiles(singletonList(file), kvSerializer, 
kvSchemaType);
+            updateKeyIndex(keySerializer, file, sortedFiles);
+            result.before().add(file);
+            result.after().addAll(sortedFiles);
+        }
+
+        // Phase 2: Universal Compaction on sorted files that existed before 
Phase 1.
+        // Files produced by Phase 1 are excluded to avoid the same file 
appearing in both
+        // result.before() and result.after().
+        List<List<DataFileMeta>> mergeGroups;
+        if (fullCompaction) {
+            mergeGroups = singletonList(existingSortedFiles);
+        } else {
+            mergeGroups = pickMergeCandidates(existingSortedFiles);
+        }
+
+        for (List<DataFileMeta> mergeGroup : mergeGroups) {
+            if (mergeGroup.size() >= 2) {
+                List<DataFileMeta> mergedFiles = 
mergeAndRewriteFiles(mergeGroup, keySerializer);
+                result.before().addAll(mergeGroup);
+                result.after().addAll(mergedFiles);
+            }
+        }
+
+        CompactDeletionFile deletionFile =
+                lazyGenDeletionFile
+                        ? CompactDeletionFile.lazyGeneration(dvMaintainer)
+                        : CompactDeletionFile.generateFiles(dvMaintainer);
+        result.setDeletionFile(deletionFile);
+        return result;
+    }
+
+    /**
+     * Pick merge candidate groups based on clustering column range overlap 
and file sizes.
+     *
+     * <ol>
+     *   <li><b>Group into sections</b>: Files are sorted by minKey and 
grouped into sections based
+     *       on clustering column key range overlap. Overlapping files belong 
to the same section.
+     *   <li><b>Merge adjacent sections</b>: Sections that have overlapping 
files (size &gt;= 2) or
+     *       are small (total size &lt; targetFileSize/2) are accumulated 
together. Large
+     *       single-file sections act as barriers, flushing accumulated files 
into a merge group.
+     * </ol>
+     *
+     * @param sortedFiles all sorted files
+     * @return list of merge groups; each group contains files to merge 
together
+     */
+    private List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta> 
sortedFiles) {
+        if (sortedFiles.size() < 2) {
+            return java.util.Collections.emptyList();
+        }
+
+        // Step 1: Group files into sections based on clustering column range 
overlap.
+        List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
+
+        // Step 2: Merge adjacent sections when beneficial to reduce small 
files.
+        // A section should be merged if it has overlapping files (size >= 2) 
or is small.
+        long smallSectionThreshold = targetFileSize / 2;
+        List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
+        List<DataFileMeta> pending = new ArrayList<>();
+
+        for (List<DataFileMeta> section : sections) {
+            boolean needsMerge = section.size() >= 2;
+            boolean isSmall = sectionSize(section) < smallSectionThreshold;
+
+            if (needsMerge || isSmall) {
+                // This section should be merged, accumulate it
+                pending.addAll(section);
+            } else {
+                // This section is a single large file, flush pending if any
+                if (pending.size() >= 2) {
+                    mergeGroups.add(new ArrayList<>(pending));
+                }
+                pending.clear();
+            }
+        }
+
+        // Flush remaining pending files
+        if (pending.size() >= 2) {
+            mergeGroups.add(pending);
+        }
+
+        return mergeGroups;
+    }
+
+    private long sectionSize(List<DataFileMeta> section) {
+        long total = 0;
+        for (DataFileMeta file : section) {
+            total += file.fileSize();
+        }
+        return total;
+    }
+
+    /**
+     * Group files into sections based on clustering column key range overlap. 
Files are first
+     * sorted by minKey, then adjacent files with overlapping ranges are 
grouped into the same
+     * section.
+     *
+     * @param files input files
+     * @return list of sections, each section contains overlapping files
+     */
+    private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta> 
files) {
+        // Sort files by minKey to properly detect overlapping ranges
+        List<DataFileMeta> sorted = new ArrayList<>(files);
+        sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), 
b.minKey()));
+
+        List<List<DataFileMeta>> sections = new ArrayList<>();
+        List<DataFileMeta> currentSection = new ArrayList<>();
+        currentSection.add(sorted.get(0));
+        BinaryRow currentMax = sorted.get(0).maxKey();
+
+        for (int i = 1; i < sorted.size(); i++) {
+            DataFileMeta file = sorted.get(i);
+            if (clusteringComparatorAlone.compare(currentMax, file.minKey()) 
>= 0) {
+                // Overlaps with current section
+                currentSection.add(file);
+                if (clusteringComparatorAlone.compare(file.maxKey(), 
currentMax) > 0) {
+                    currentMax = file.maxKey();
+                }
+            } else {
+                sections.add(currentSection);
+                currentSection = new ArrayList<>();
+                currentSection.add(file);
+                currentMax = file.maxKey();
+            }
+        }
+        sections.add(currentSection);
+        return sections;
+    }
+
+    /**
+     * Update the key index for a single original file replaced by new sorted 
files. Marks old key
+     * positions in deletion vectors and registers new positions.
+     */
+    private void updateKeyIndex(
+            RowCompactedSerializer keySerializer,
+            DataFileMeta originalFile,
+            List<DataFileMeta> newSortedFiles)
+            throws Exception {
+        updateKeyIndex(keySerializer, singletonList(originalFile), 
newSortedFiles);
+    }
+
+    /**
+     * Update the key index for multiple original files replaced by new sorted 
files.
+     *
+     * <p>For DEDUPLICATE mode: mark the old position in deletion vectors, 
keep the new position.
+     *
+     * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion 
vectors (keep the
+     * first/old one); if key is new, store the new position.
+     */
+    private void updateKeyIndex(
+            RowCompactedSerializer keySerializer,
+            List<DataFileMeta> originalFiles,
+            List<DataFileMeta> newSortedFiles)
+            throws Exception {
+        // Collect file names of original files to avoid self-deletion marking
+        java.util.Set<String> originalFileNames = new java.util.HashSet<>();
+        for (DataFileMeta file : originalFiles) {
+            originalFileNames.add(file.fileName());
+        }
+
+        for (DataFileMeta sortedFile : newSortedFiles) {
+            int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
+            int position = 0;
+            try (CloseableIterator<InternalRow> iterator = 
readKeyIterator(sortedFile)) {
+                while (iterator.hasNext()) {
+                    byte[] key = 
keySerializer.serializeToBytes(iterator.next());
+                    byte[] oldValue = kvDb.get(key);
+                    if (oldValue != null) {
+                        ByteArrayInputStream valueIn = new 
ByteArrayInputStream(oldValue);
+                        int oldFileId = decodeInt(valueIn);
+                        int oldPosition = decodeInt(valueIn);
+                        DataFileMeta oldFile = 
fileLevels.getFileById(oldFileId);
+                        if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
+                            if (firstRow) {
+                                // First-row mode: keep the old (first) 
record, delete the new one
+                                
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
+                                position++;
+                                continue;
+                            } else {
+                                // Deduplicate mode: keep the new record, 
delete the old one
+                                
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
+                            }
+                        }
+                    }
+                    ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+                    encodeInt(value, fileId);
+                    encodeInt(value, position);
+                    kvDb.put(key, value.toByteArray());
+                    position++;
+                }
+            }
+        }
+    }
+
+    /**
+     * Sort and rewrite one or more unsorted files by clustering columns. 
Reads all KeyValue records
+     * from the input files, sorts them using an external sort buffer, and 
writes to new level-1
+     * files.
+     */
+    private List<DataFileMeta> sortAndRewriteFiles(
+            List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, 
RowType kvSchemaType)
+            throws Exception {
+        int[] sortFieldsInKeyValue =
+                Arrays.stream(clusteringColumns)
+                        .map(i -> i + keyType.getFieldCount() + 2)
+                        .toArray();
+        BinaryExternalSortBuffer sortBuffer =
+                BinaryExternalSortBuffer.create(
+                        ioManager,
+                        kvSchemaType,
+                        sortFieldsInKeyValue,
+                        sortSpillBufferSize,
+                        pageSize,
+                        maxNumFileHandles,
+                        CompressOptions.defaultOptions(),
+                        MemorySize.MAX_VALUE,

Review Comment:
   The external sort buffer in Phase 1 uses `CompressOptions.defaultOptions()` 
and `MemorySize.MAX_VALUE` for spill, ignoring the configured spill compression 
/ spill disk size limits that are used elsewhere (e.g., `Sorter`). This can 
lead to unexpectedly high disk usage and inconsistent compression behavior. 
Consider wiring in the configured spill compression and a bounded max disk size 
option for the clustering compaction sort buffer.
   



##########
paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java:
##########
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.separated;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.PK_CLUSTERING_OVERRIDE;
+import static org.apache.paimon.CoreOptions.SORT_SPILL_THRESHOLD;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ClusteringTableTest {
+
+    @TempDir public java.nio.file.Path tempPath;
+
+    private Catalog catalog;
+    private Table table;
+    private IOManager ioManager;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        Path warehouse = new Path(tempPath.toString());
+        this.catalog = 
CatalogFactory.createCatalog(CatalogContext.create(warehouse));
+        catalog.createDatabase("default", true);
+        Identifier identifier = Identifier.create("default", "t");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        this.table = catalog.getTable(identifier);
+        this.ioManager = IOManager.create(tempPath.toString());
+    }
+
+    @Test
+    public void testSameKeysMultipleTimesAcrossCommits() throws Exception {
+        // Write same keys multiple times across different commits
+        writeRows(table, Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 
20)));
+        writeRows(table, Arrays.asList(GenericRow.of(1, 11), GenericRow.of(3, 
30)));
+        writeRows(table, Arrays.asList(GenericRow.of(2, 22), GenericRow.of(4, 
40)));
+        writeRows(table, Arrays.asList(GenericRow.of(1, 12), GenericRow.of(2, 
23)));
+        writeRows(table, Arrays.asList(GenericRow.of(3, 33), GenericRow.of(4, 
44)));
+
+        // Should see latest values for each key (deduplicate mode)
+        assertThat(readRows(table))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 12),
+                        GenericRow.of(2, 23),
+                        GenericRow.of(3, 33),
+                        GenericRow.of(4, 44));
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        List<GenericRow> input;
+
+        input = Arrays.asList(GenericRow.of(1, 11), GenericRow.of(2, 22));
+        writeRows(input);
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(input);
+
+        input = Arrays.asList(GenericRow.of(1, 111), GenericRow.of(2, 222));
+        writeRows(input);
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(input);
+
+        input = Arrays.asList(GenericRow.of(1, 1111), GenericRow.of(2, 2222));
+        writeRows(input);
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(input);
+    }
+
+    /** Test overlapping clustering ranges from multiple commits should be 
merged correctly. */
+    @Test
+    public void testOverlappingClusteringRanges() throws Exception {
+        // Commit 1: clustering values 10-20
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+
+        // Commit 2: clustering values 15-25 (overlaps with commit 1)
+        writeRows(Arrays.asList(GenericRow.of(3, 15), GenericRow.of(4, 25)));
+
+        // Commit 3: clustering values 18-22 (overlaps with both)
+        writeRows(Arrays.asList(GenericRow.of(5, 18), GenericRow.of(6, 22)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 15),
+                        GenericRow.of(4, 25),
+                        GenericRow.of(5, 18),
+                        GenericRow.of(6, 22));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test non-overlapping clustering ranges should remain separate (reduce 
IO amplification). */
+    @Test
+    public void testNonOverlappingClusteringRanges() throws Exception {
+        // Commit 1: clustering values 10-20
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+
+        // Commit 2: clustering values 50-60 (no overlap with commit 1)
+        writeRows(Arrays.asList(GenericRow.of(3, 50), GenericRow.of(4, 60)));
+
+        // Commit 3: clustering values 100-110 (no overlap with others)
+        writeRows(Arrays.asList(GenericRow.of(5, 100), GenericRow.of(6, 110)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 50),
+                        GenericRow.of(4, 60),
+                        GenericRow.of(5, 100),
+                        GenericRow.of(6, 110));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test updating the same key multiple times with different clustering 
values. */
+    @Test
+    public void testMultipleUpdatesToSameKey() throws Exception {
+        // Initial write
+        writeRows(Arrays.asList(GenericRow.of(1, 100), GenericRow.of(2, 200)));
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 100), 
GenericRow.of(2, 200));
+
+        // Update key 1 with different clustering value
+        writeRows(Arrays.asList(GenericRow.of(1, 50)));
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 50), 
GenericRow.of(2, 200));
+
+        // Update key 1 again
+        writeRows(Arrays.asList(GenericRow.of(1, 150)));
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 150), 
GenericRow.of(2, 200));
+
+        // Update key 2
+        writeRows(Arrays.asList(GenericRow.of(2, 75)));
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 150), 
GenericRow.of(2, 75));
+    }
+
+    /** Test mixed overlapping and non-overlapping sections. */
+    @Test
+    public void testMixedOverlapAndNonOverlap() throws Exception {
+        // Section A: 10-30
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 30)));
+
+        // Section A extended: 20-40 (overlaps with Section A)
+        writeRows(Arrays.asList(GenericRow.of(3, 20), GenericRow.of(4, 40)));
+
+        // Section B: 100-120 (no overlap)
+        writeRows(Arrays.asList(GenericRow.of(5, 100), GenericRow.of(6, 120)));
+
+        // Section B extended: 110-130 (overlaps with Section B)
+        writeRows(Arrays.asList(GenericRow.of(7, 110), GenericRow.of(8, 130)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 30),
+                        GenericRow.of(3, 20),
+                        GenericRow.of(4, 40),
+                        GenericRow.of(5, 100),
+                        GenericRow.of(6, 120),
+                        GenericRow.of(7, 110),
+                        GenericRow.of(8, 130));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test large number of writes to trigger multiple compactions. */
+    @Test
+    public void testManyWrites() throws Exception {
+        List<GenericRow> allRows = new ArrayList<>();
+
+        // Write 10 batches with different clustering ranges
+        for (int batch = 0; batch < 10; batch++) {
+            List<GenericRow> batchRows = new ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                int pk = batch * 5 + i;
+                int clusteringValue = (batch % 3) * 100 + i * 10; // Creates 
some overlapping ranges
+                batchRows.add(GenericRow.of(pk, clusteringValue));
+            }
+            writeRows(batchRows);
+            allRows.addAll(batchRows);
+        }
+
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(allRows);
+    }
+
+    /** Test updates that change clustering column causing record to move 
between sections. */
+    @Test
+    public void testUpdateChangesClusteringSection() throws Exception {
+        // Initial: key 1 in section [10-20]
+        writeRows(Arrays.asList(GenericRow.of(1, 15), GenericRow.of(2, 18)));
+
+        // Update: key 1 moves to section [100-120] (different section)
+        writeRows(Arrays.asList(GenericRow.of(1, 110)));
+
+        // Key 1 should now have clustering value 110, key 2 stays at 18
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 110), 
GenericRow.of(2, 18));
+
+        // Another update: key 1 moves back to a section near key 2
+        writeRows(Arrays.asList(GenericRow.of(1, 20)));
+
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 20), 
GenericRow.of(2, 18));
+    }
+
+    /** Test single row write and read. */
+    @Test
+    public void testSingleRow() throws Exception {
+        writeRows(Arrays.asList(GenericRow.of(1, 100)));
+        assertThat(readRows()).containsExactly(GenericRow.of(1, 100));
+
+        // Update single row
+        writeRows(Arrays.asList(GenericRow.of(1, 200)));
+        assertThat(readRows()).containsExactly(GenericRow.of(1, 200));
+    }
+
+    /** Test adjacent but non-overlapping ranges (boundary case). */
+    @Test
+    public void testAdjacentNonOverlappingRanges() throws Exception {
+        // Range [10, 20]
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+
+        // Range [21, 30] - adjacent but not overlapping (21 > 20)
+        writeRows(Arrays.asList(GenericRow.of(3, 21), GenericRow.of(4, 30)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 21),
+                        GenericRow.of(4, 30));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test exact boundary overlap (max of one equals min of another). */
+    @Test
+    public void testExactBoundaryOverlap() throws Exception {
+        // Range [10, 20]
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+
+        // Range [20, 30] - overlaps exactly at boundary (20 >= 20)
+        writeRows(Arrays.asList(GenericRow.of(3, 20), GenericRow.of(4, 30)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 20),
+                        GenericRow.of(4, 30));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test same clustering value for all records. */
+    @Test
+    public void testSameClusteringValue() throws Exception {
+        writeRows(Arrays.asList(GenericRow.of(1, 50), GenericRow.of(2, 50)));
+        writeRows(Arrays.asList(GenericRow.of(3, 50), GenericRow.of(4, 50)));
+        writeRows(Arrays.asList(GenericRow.of(5, 50)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 50),
+                        GenericRow.of(2, 50),
+                        GenericRow.of(3, 50),
+                        GenericRow.of(4, 50),
+                        GenericRow.of(5, 50));
+        assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test delete by writing a key then updating to see old value is gone. */
+    @Test
+    public void testDeletionVectorCorrectness() throws Exception {
+        // Write initial data
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+
+        // Write same keys with new values multiple times
+        for (int i = 0; i < 5; i++) {
+            writeRows(Arrays.asList(GenericRow.of(1, 10 + i * 10), 
GenericRow.of(2, 20 + i * 10)));
+        }
+
+        // Should only see the latest values
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 50), 
GenericRow.of(2, 60));
+    }
+
+    // ==================== First-Row Mode Tests ====================
+
+    /** Test first-row mode keeps the first record when same key is written 
multiple times. */
+    @Test
+    public void testFirstRowBasic() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Write initial data
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 100), 
GenericRow.of(2, 200)));
+
+        // Write same keys with different values - should be ignored 
(first-row keeps first)
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 999), 
GenericRow.of(2, 888)));
+
+        // Should still see the first values
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 100), 
GenericRow.of(2, 200));
+    }
+
+    /** Test first-row mode with multiple commits. */
+    @Test
+    public void testFirstRowMultipleCommits() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Commit 1: initial records
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+
+        // Commit 2: new keys + duplicate keys
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 99), 
GenericRow.of(3, 30)));
+
+        // Commit 3: more duplicates + new key
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(2, 88), 
GenericRow.of(4, 40)));
+
+        // Key 1,2 should keep first values; key 3,4 are new
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40));
+    }
+
+    /** Test first-row mode with overlapping clustering ranges. */
+    @Test
+    public void testFirstRowOverlappingRanges() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Commit 1: clustering values 10-20
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+
+        // Commit 2: overlapping range with duplicate key 1
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 15), 
GenericRow.of(3, 25)));
+
+        // Key 1 should keep first value (10), key 2,3 are unique
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10), GenericRow.of(2, 20), 
GenericRow.of(3, 25));
+    }
+
+    /** Test first-row mode behavior differs from deduplicate mode. */
+    @Test
+    public void testFirstRowVsDeduplicate() throws Exception {
+        // Deduplicate mode table (default)
+        writeRows(Arrays.asList(GenericRow.of(1, 100)));
+        writeRows(Arrays.asList(GenericRow.of(1, 200)));
+        // Deduplicate keeps last value
+        assertThat(readRows()).containsExactly(GenericRow.of(1, 200));
+
+        // First-row mode table
+        Table firstRowTable = createFirstRowTable();
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 100)));
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 200)));
+        // First-row keeps first value
+        assertThat(readRows(firstRowTable)).containsExactly(GenericRow.of(1, 
100));
+    }
+
+    /** Test first-row mode: updating clustering value should still keep the 
first record. */
+    @Test
+    public void testFirstRowUpdateChangesClusteringSection() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Initial: key 1 in section [10-20]
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 15), 
GenericRow.of(2, 18)));
+
+        // Update: key 1 tries to move to section [100-120] - should be ignored
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 110)));
+
+        // Key 1 should keep first value (15), key 2 stays at 18
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 15), 
GenericRow.of(2, 18));
+
+        // Another attempt to update key 1 - still ignored
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 20)));
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 15), 
GenericRow.of(2, 18));
+    }
+
+    /** Test first-row mode DV correctness: rapid repeated writes mark new 
records, not old. */
+    @Test
+    public void testFirstRowDeletionVectorCorrectness() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Write initial data
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+
+        // Write same keys with new values multiple times - all should be 
ignored
+        for (int i = 0; i < 5; i++) {
+            writeRows(
+                    firstRowTable,
+                    Arrays.asList(GenericRow.of(1, 10 + i * 10), 
GenericRow.of(2, 20 + i * 10)));
+        }
+
+        // Should still see the very first values
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 10), 
GenericRow.of(2, 20));
+    }
+
+    /** Test first-row mode with many writes to trigger compaction. */
+    @Test
+    public void testFirstRowManyWrites() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // First commit: establish initial values for keys 0-4
+        List<GenericRow> firstBatch = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            firstBatch.add(GenericRow.of(i, i * 10));
+        }
+        writeRows(firstRowTable, firstBatch);
+
+        // Subsequent commits: write same keys with different values
+        for (int batch = 1; batch < 10; batch++) {
+            List<GenericRow> batchRows = new ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                batchRows.add(GenericRow.of(i, i * 10 + batch * 100));
+            }
+            writeRows(firstRowTable, batchRows);
+        }
+
+        // Should only see the first values
+        
assertThat(readRows(firstRowTable)).containsExactlyInAnyOrderElementsOf(firstBatch);
+    }
+
+    // ==================== Spill Tests ====================
+
+    /** Test first-row mode with spill: keeps first values despite many 
duplicate commits. */
+    @Test
+    public void testFirstRowSpillBasic() throws Exception {
+        Table spillFirstRowTable = createFirstRowTableWithLowSpillThreshold();
+
+        // Commit 1: initial records
+        writeRows(spillFirstRowTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+
+        // Commits 2-5: same keys with different values - all should be ignored
+        writeRows(spillFirstRowTable, Arrays.asList(GenericRow.of(1, 99), 
GenericRow.of(3, 30)));
+        writeRows(spillFirstRowTable, Arrays.asList(GenericRow.of(2, 88), 
GenericRow.of(4, 40)));
+        writeRows(spillFirstRowTable, Arrays.asList(GenericRow.of(1, 77), 
GenericRow.of(2, 66)));
+        writeRows(spillFirstRowTable, Arrays.asList(GenericRow.of(3, 55), 
GenericRow.of(4, 44)));
+
+        // Key 1,2 should keep first values; key 3,4 keep their first 
appearance
+        assertThat(readRows(spillFirstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40));
+    }
+
+    /** Test first-row mode with spill and many writes to stress test. */
+    @Test
+    public void testFirstRowSpillManyWrites() throws Exception {
+        Table spillFirstRowTable = createFirstRowTableWithLowSpillThreshold();
+
+        // First commit: establish initial values for keys 0-4
+        List<GenericRow> firstBatch = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            firstBatch.add(GenericRow.of(i, i * 10));
+        }
+        writeRows(spillFirstRowTable, firstBatch);
+
+        // Subsequent commits: write same keys with different values
+        for (int batch = 1; batch < 10; batch++) {
+            List<GenericRow> batchRows = new ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                batchRows.add(GenericRow.of(i, i * 10 + batch * 100));
+            }
+            writeRows(spillFirstRowTable, batchRows);
+        }
+
+        // Should only see the first values
+        
assertThat(readRows(spillFirstRowTable)).containsExactlyInAnyOrderElementsOf(firstBatch);
+    }
+
+    @Test
+    public void testSameKeysMultipleTimesAcrossCommitsInSpill() throws 
Exception {
+        Table spillTable = createTableWithLowSpillThreshold();
+        // Write same keys multiple times across different commits
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 11), 
GenericRow.of(3, 30)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(2, 22), 
GenericRow.of(4, 40)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 12), 
GenericRow.of(2, 23)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(3, 33), 
GenericRow.of(4, 44)));
+
+        // Should see latest values for each key (deduplicate mode)
+        assertThat(readRows(spillTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 12),
+                        GenericRow.of(2, 23),
+                        GenericRow.of(3, 33),
+                        GenericRow.of(4, 44));
+    }
+
+    /**
+     * Test row-based spill when merging many files. Uses a low spillThreshold 
to trigger spill
+     * logic with fewer files.
+     */
+    @Test
+    public void testSpillWithManyFiles() throws Exception {
+        Table spillTable = createTableWithLowSpillThreshold();
+
+        // Write many separate commits to create many files
+        // Each commit creates a new file, and we want to exceed 
spillThreshold (set to 2)
+        List<GenericRow> allRows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            List<GenericRow> batch =
+                    Arrays.asList(
+                            GenericRow.of(i * 2, i * 10), GenericRow.of(i * 2 
+ 1, i * 10 + 5));
+            writeRows(spillTable, batch);
+            allRows.addAll(batch);
+        }
+
+        // All rows should be readable after compaction with spill
+        
assertThat(readRows(spillTable)).containsExactlyInAnyOrderElementsOf(allRows);
+    }
+
+    /** Test spill with overlapping clustering ranges. */
+    @Test
+    public void testSpillWithOverlappingRanges() throws Exception {
+        Table spillTable = createTableWithLowSpillThreshold();
+
+        // Create files with overlapping clustering ranges to trigger merge
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(3, 15), 
GenericRow.of(4, 25)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(5, 12), 
GenericRow.of(6, 22)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(7, 18), 
GenericRow.of(8, 28)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(9, 14), 
GenericRow.of(10, 24)));
+
+        List<GenericRow> expected =
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 15),
+                        GenericRow.of(4, 25),
+                        GenericRow.of(5, 12),
+                        GenericRow.of(6, 22),
+                        GenericRow.of(7, 18),
+                        GenericRow.of(8, 28),
+                        GenericRow.of(9, 14),
+                        GenericRow.of(10, 24));
+        
assertThat(readRows(spillTable)).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** Test spill with deduplication across separate compaction rounds. */
+    @Test
+    public void testSpillWithDeduplication() throws Exception {
+        Table spillTable = createTableWithLowSpillThreshold();
+
+        // First round: write initial data
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 10), 
GenericRow.of(2, 20)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(3, 30), 
GenericRow.of(4, 40)));
+
+        // Verify initial data
+        assertThat(readRows(spillTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40));
+
+        // Second round: update keys with new values
+        writeRows(spillTable, Arrays.asList(GenericRow.of(1, 11), 
GenericRow.of(2, 22)));
+        writeRows(spillTable, Arrays.asList(GenericRow.of(3, 33), 
GenericRow.of(4, 44)));
+
+        // Should see latest values for each key
+        assertThat(readRows(spillTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 11),
+                        GenericRow.of(2, 22),
+                        GenericRow.of(3, 33),
+                        GenericRow.of(4, 44));
+    }
+
+    private Table createFirstRowTableWithLowSpillThreshold() throws Exception {
+        Identifier identifier = Identifier.create("default", 
"first_row_spill_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .option(SORT_SPILL_THRESHOLD.key(), "2")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private Table createTableWithLowSpillThreshold() throws Exception {
+        Identifier identifier = Identifier.create("default", "spill_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(SORT_SPILL_THRESHOLD.key(), "2") // Low 
threshold to trigger spill
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private Table createFirstRowTable() throws Exception {
+        Identifier identifier = Identifier.create("default", 
"first_row_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private void writeRows(List<GenericRow> rows) throws Exception {
+        writeRows(table, rows);
+    }
+
+    private void writeRows(Table targetTable, List<GenericRow> rows) throws 
Exception {
+        BatchWriteBuilder writeBuilder = targetTable.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
writeBuilder.newWrite().withIOManager(ioManager);
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (GenericRow row : rows) {
+                write.write(row);
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    private List<GenericRow> readRows() throws Exception {
+        return readRows(table);
+    }
+
+    private List<GenericRow> readRows(Table targetTable) throws Exception {
+        ReadBuilder readBuilder = targetTable.newReadBuilder();
+        @SuppressWarnings("resource")
+        CloseableIterator<InternalRow> iterator =
+                readBuilder
+                        .newRead()
+                        .createReader(readBuilder.newScan().plan())
+                        .toCloseableIterator();
+        List<GenericRow> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            InternalRow row = iterator.next();
+            result.add(GenericRow.of(row.getInt(0), row.getInt(1)));

Review Comment:
   `readRows` builds a `CloseableIterator` but never closes it (suppressed as a 
resource warning). This can leak file handles / temp resources and make the 
test flaky on some platforms. Prefer try-with-resources around the iterator (or 
close it in a finally block).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to