This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit ec5078e112173fe5ec586738df7d7dfb508b02a3
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Jan 29 20:18:27 2026 +0800

    [kv] Implement lookup with insertIfNotExists on tablet server (#2485)
---
 .../java/org/apache/fluss/metadata/Schema.java     |  11 +
 .../org/apache/fluss/record/KeyRecordBatch.java    | 251 ++++++++++++++++
 .../org/apache/fluss/record/KvRecordBatch.java     |   4 +
 .../apache/fluss/record/KvRecordReadContext.java   |   5 +
 .../fluss/row/decode/CompactedKeyDecoder.java      |  84 ++++++
 .../org/apache/fluss/row/decode/KeyDecoder.java    |  79 +++++
 .../row/decode/iceberg/IcebergKeyDecoder.java      | 143 +++++++++
 .../fluss/row/decode/paimon/PaimonKeyDecoder.java  | 210 ++++++++++++++
 .../row/encode/paimon/PaimonBinaryRowWriter.java   |   1 +
 .../apache/fluss/record/KeyRecordBatchTest.java    | 194 +++++++++++++
 .../java/org/apache/fluss/record/TestData.java     |  28 ++
 .../fluss/row/decode/CompactedKeyDecoderTest.java  | 319 +++++++++++++++++++++
 .../row/decode/iceberg/IcebergKeyDecoderTest.java  | 212 ++++++++++++++
 .../row/decode/paimon/PaimonKeyDecoderTest.java    | 227 +++++++++++++++
 .../row/encode/paimon/PaimonKeyEncoderTest.java    |   4 +-
 .../org/apache/fluss/testutils/DataTestUtils.java  |  14 +
 fluss-rpc/src/main/proto/FlussApi.proto            |   3 +
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  23 +-
 .../fluss/server/replica/ReplicaManager.java       | 151 +++++++++-
 .../apache/fluss/server/tablet/TabletService.java  |  33 ++-
 .../fluss/server/replica/ReplicaManagerTest.java   | 300 +++++++++++++++++++
 .../fluss/server/replica/ReplicaTestBase.java      |  10 +
 .../fluss/server/tablet/TabletServiceITCase.java   |  99 +++++++
 .../server/testutils/RpcMessageTestUtils.java      |  21 ++
 fluss-test-coverage/pom.xml                        |   1 +
 25 files changed, 2411 insertions(+), 16 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 92b675c42..a97f09ee9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -151,6 +151,17 @@ public final class Schema implements Serializable {
                 .orElseGet(() -> new int[0]);
     }
 
+    /**
+     * Returns column indexes excluding auto-increment columns, used for 
partial update operations.
+     */
+    public @Nullable int[] getNonAutoIncrementColumnIndexes() {
+        if (autoIncrementColumnNames.isEmpty()) {
+            return null;
+        }
+        int autoIncIdx = getColumnIndexes(autoIncrementColumnNames)[0];
+        return IntStream.range(0, columns.size()).filter(i -> i != 
autoIncIdx).toArray();
+    }
+
     /** Returns the auto-increment columnIds, if any, otherwise returns an 
empty array. */
     public int[] getAutoIncrementColumnIds() {
         if (autoIncrementColumnNames.isEmpty()) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java
new file mode 100644
index 000000000..6cf684ee1
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.decode.KeyDecoder;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+
+/**
+ * A lazy-decoding KvRecordBatch that stores key bytes and converts them to 
KvRecords on-demand
+ * during iteration.
+ *
+ * <p>When iterating via {@link #records(ReadContext)}, each key is decoded 
and a full row is
+ * constructed with key fields populated and non-key fields set to null. This 
avoids upfront
+ * full-batch decoding, improving memory efficiency.
+ */
+public class KeyRecordBatch implements KvRecordBatch {
+
+    private final List<byte[]> keyBytes;
+    private final short schemaId;
+    private final KvFormat kvFormat;
+    private final short kvFormatVersion;
+    private final boolean defaultBucketKey;
+    private final @Nullable DataLakeFormat lakeFormat;
+
+    public static KeyRecordBatch create(List<byte[]> keyBytes, TableInfo 
tableInfo) {
+        TableConfig tableConfig = tableInfo.getTableConfig();
+        KvFormat kvFormat = tableConfig.getKvFormat();
+        short kvFormatVersion = 
tableConfig.getKvFormatVersion().orElse(1).shortValue();
+        short schemaId = (short) tableInfo.getSchemaId();
+        boolean defaultBucketKey = tableInfo.isDefaultBucketKey();
+        return new KeyRecordBatch(
+                keyBytes,
+                kvFormat,
+                kvFormatVersion,
+                schemaId,
+                defaultBucketKey,
+                tableConfig.getDataLakeFormat().orElse(null));
+    }
+
+    @VisibleForTesting
+    KeyRecordBatch(List<byte[]> keyBytes, KvFormat kvFormat, short schemaId) {
+        this(keyBytes, kvFormat, (short) 1, schemaId, true, null);
+    }
+
+    /**
+     * Creates a KeyRecordBatch for lazy KvRecord construction.
+     *
+     * @param keyBytes the list of encoded key bytes
+     */
+    public KeyRecordBatch(
+            List<byte[]> keyBytes,
+            KvFormat kvFormat,
+            short kvFormatVersion,
+            short schemaId,
+            boolean defaultBucketKey,
+            @Nullable DataLakeFormat lakeFormat) {
+        this.keyBytes = keyBytes;
+        this.schemaId = schemaId;
+        this.kvFormat = kvFormat;
+        this.kvFormatVersion = kvFormatVersion;
+        this.defaultBucketKey = defaultBucketKey;
+        this.lakeFormat = lakeFormat;
+    }
+
+    @Override
+    public Iterable<KvRecord> records(ReadContext readContext) {
+        return () -> new LazyKvRecordIterator(readContext);
+    }
+
+    @Override
+    public int getRecordCount() {
+        return keyBytes.size();
+    }
+
+    @Override
+    public short schemaId() {
+        return schemaId;
+    }
+
+    @Override
+    public boolean isValid() {
+        return true;
+    }
+
+    @Override
+    public void ensureValid() {
+        // In-memory batch is always valid
+    }
+
+    @Override
+    public long checksum() {
+        return 0;
+    }
+
+    @Override
+    public byte magic() {
+        return CURRENT_KV_MAGIC_VALUE;
+    }
+
+    @Override
+    public long writerId() {
+        return NO_WRITER_ID;
+    }
+
+    @Override
+    public int batchSequence() {
+        return NO_BATCH_SEQUENCE;
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return keyBytes.stream().mapToInt(k -> k.length).sum();
+    }
+
+    /** Lazy iterator that constructs KvRecord on-demand during iteration. */
+    private class LazyKvRecordIterator implements Iterator<KvRecord> {
+        private int index = 0;
+
+        private final int fieldCount;
+        private final int[] keyIdxByFieldIdx; // Maps field index to key index 
(-1 if not a key)
+        private final InternalRow.FieldGetter[] keyFieldGetters;
+        private final RowEncoder rowEncoder;
+        private final KeyDecoder keyDecoder;
+
+        LazyKvRecordIterator(ReadContext context) {
+            SchemaGetter schemaGetter = context.getSchemaGetter();
+            Schema schema = schemaGetter.getSchema(schemaId);
+            RowType rowType = schema.getRowType();
+            List<String> keyColumnNames = schema.getPrimaryKeyColumnNames();
+
+            this.fieldCount = rowType.getFieldCount();
+            int[] keyFieldIndexes = schema.getPrimaryKeyIndexes();
+            this.keyDecoder =
+                    KeyDecoder.ofPrimaryKeyDecoder(
+                            rowType, keyColumnNames, kvFormatVersion, 
lakeFormat, defaultBucketKey);
+            this.keyFieldGetters = new 
InternalRow.FieldGetter[keyColumnNames.size()];
+
+            // Build reverse mapping: field index -> key index
+            this.keyIdxByFieldIdx = new int[fieldCount];
+            for (int i = 0; i < fieldCount; i++) {
+                keyIdxByFieldIdx[i] = -1;
+            }
+            for (int keyIdx = 0; keyIdx < keyFieldIndexes.length; keyIdx++) {
+                keyIdxByFieldIdx[keyFieldIndexes[keyIdx]] = keyIdx;
+            }
+
+            // Create field getters for key fields (reading from decoded key 
row at position i)
+            for (int i = 0; i < keyColumnNames.size(); i++) {
+                DataType keyFieldType = rowType.getTypeAt(keyFieldIndexes[i]);
+                keyFieldGetters[i] = 
InternalRow.createFieldGetter(keyFieldType, i);
+            }
+
+            this.rowEncoder = RowEncoder.create(kvFormat, rowType);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return index < keyBytes.size();
+        }
+
+        @Override
+        public KvRecord next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            byte[] key = keyBytes.get(index++);
+
+            InternalRow keyRow = keyDecoder.decodeKey(key);
+
+            // Build full row: key fields from decoded values, non-key fields 
as null
+            rowEncoder.startNewRow();
+            for (int i = 0; i < fieldCount; i++) {
+                int keyIdx = keyIdxByFieldIdx[i];
+                if (keyIdx != -1) {
+                    // This is a key field - copy from decoded key row
+                    Object value = 
keyFieldGetters[keyIdx].getFieldOrNull(keyRow);
+                    rowEncoder.encodeField(i, value);
+                } else {
+                    // Non-key field - set to null
+                    rowEncoder.encodeField(i, null);
+                }
+            }
+            BinaryRow binaryRow = rowEncoder.finishRow();
+
+            return new KeyOnlyKvRecord(key, binaryRow);
+        }
+    }
+
+    /** Simple KvRecord implementation for key-only records. */
+    private static class KeyOnlyKvRecord implements KvRecord {
+        private final byte[] key;
+        private final BinaryRow row;
+
+        KeyOnlyKvRecord(byte[] key, BinaryRow row) {
+            this.key = key;
+            this.row = row;
+        }
+
+        @Override
+        public ByteBuffer getKey() {
+            return ByteBuffer.wrap(key);
+        }
+
+        @Override
+        public BinaryRow getRow() {
+            return row;
+        }
+
+        @Override
+        public int getSizeInBytes() {
+            return key.length + (row != null ? row.getSizeInBytes() : 0);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java
index 66c999c28..0689ecadd 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.record;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.row.decode.RowDecoder;
 
 /**
@@ -107,6 +108,9 @@ public interface KvRecordBatch {
     /** The read context of a {@link KvRecordBatch} to read records. */
     interface ReadContext {
 
+        /** Gets the schema getter to retrieve schema for decoding record 
bytes. */
+        SchemaGetter getSchemaGetter();
+
         /**
          * Gets the row decoder for the given schema to decode bytes read from 
{@link
          * KvRecordBatch}.
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
index 5fffc3f4f..7e1878fc4 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
@@ -44,6 +44,11 @@ public class KvRecordReadContext implements 
KvRecordBatch.ReadContext {
         return new KvRecordReadContext(kvFormat, schemaGetter);
     }
 
+    @Override
+    public SchemaGetter getSchemaGetter() {
+        return schemaGetter;
+    }
+
     @Override
     public RowDecoder getRowDecoder(int schemaId) {
         return rowDecoderCache.computeIfAbsent(
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java
new file mode 100644
index 000000000..bebc210bc
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.compacted.CompactedRowReader;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.util.List;
+
+/**
+ * A decoder to decode key bytes (encoded by {@link CompactedKeyEncoder}) back 
to {@link
+ * InternalRow}.
+ */
+public class CompactedKeyDecoder implements KeyDecoder {
+
+    private final DataType[] keyDataTypes;
+
+    /**
+     * Create a key decoder to decode the key bytes back to a row containing 
only key fields.
+     *
+     * @param rowType the row type used to locate key field positions
+     * @param keys the key field names to decode
+     */
+    public static CompactedKeyDecoder createKeyDecoder(RowType rowType, 
List<String> keys) {
+        int[] keyFieldPos = new int[keys.size()];
+        for (int i = 0; i < keys.size(); i++) {
+            keyFieldPos[i] = rowType.getFieldIndex(keys.get(i));
+            if (keyFieldPos[i] == -1) {
+                throw new IllegalArgumentException(
+                        "Field " + keys.get(i) + " not found in row type " + 
rowType);
+            }
+        }
+        return new CompactedKeyDecoder(rowType, keyFieldPos);
+    }
+
+    public CompactedKeyDecoder(RowType rowType, int[] keyFieldPos) {
+        // for decoding key fields
+        keyDataTypes = new DataType[keyFieldPos.length];
+        for (int i = 0; i < keyFieldPos.length; i++) {
+            keyDataTypes[i] = rowType.getTypeAt(keyFieldPos[i]).copy(false);
+        }
+    }
+
+    /**
+     * Decode the key bytes back to a row containing only key fields. Non-key 
fields are not
+     * included in the returned row.
+     *
+     * @param keyBytes the key bytes encoded by {@link CompactedKeyEncoder}
+     * @return a row containing only the decoded key fields
+     */
+    public InternalRow decodeKey(byte[] keyBytes) {
+        // Decode key fields directly into a GenericRow matching the decoded 
key size
+        CompactedRowReader compactedRowReader = new CompactedRowReader(0);
+        compactedRowReader.pointTo(MemorySegment.wrap(keyBytes), 0, 
keyBytes.length);
+
+        CompactedRowDeserializer compactedRowDeserializer =
+                new CompactedRowDeserializer(keyDataTypes);
+        GenericRow genericRow = new GenericRow(keyDataTypes.length);
+        compactedRowDeserializer.deserialize(compactedRowReader, genericRow);
+        return genericRow;
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java
new file mode 100644
index 000000000..770526879
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode;
+
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.decode.iceberg.IcebergKeyDecoder;
+import org.apache.fluss.row.decode.paimon.PaimonKeyDecoder;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Interface for decoding key bytes back to {@link InternalRow}.
+ *
+ * <p>This interface provides functionality to decode binary key bytes into 
internal row
+ * representation, typically used for primary key decoding in KV tables.
+ */
+public interface KeyDecoder {
+    /** Decode key bytes to a row containing only key fields, without non-key 
fields. */
+    InternalRow decodeKey(byte[] keyBytes);
+
+    /**
+     * Creates a primary key decoder based on KV format version and data lake 
format.
+     *
+     * <p>Behavior aligns with {@link 
org.apache.fluss.row.encode.KeyEncoder#ofPrimaryKeyEncoder}.
+     *
+     * @param rowType the row type containing all fields
+     * @param keyFields the list of primary key field names
+     * @param kvFormatVersion the KV format version (1 or 2)
+     * @param lakeFormat the data lake format, null if not using lake storage
+     * @param isDefaultBucketKey whether using default bucket key (primary key 
as bucket key)
+     * @return the corresponding key decoder
+     */
+    static KeyDecoder ofPrimaryKeyDecoder(
+            RowType rowType,
+            List<String> keyFields,
+            short kvFormatVersion,
+            @Nullable DataLakeFormat lakeFormat,
+            boolean isDefaultBucketKey) {
+        if (kvFormatVersion == 1 || (kvFormatVersion == 2 && 
isDefaultBucketKey)) {
+            if (lakeFormat == null || lakeFormat == DataLakeFormat.LANCE) {
+                return CompactedKeyDecoder.createKeyDecoder(rowType, 
keyFields);
+            }
+            if (lakeFormat == DataLakeFormat.PAIMON) {
+                return new PaimonKeyDecoder(rowType, keyFields);
+            }
+            if (lakeFormat == DataLakeFormat.ICEBERG) {
+                return new IcebergKeyDecoder(rowType, keyFields);
+            }
+            throw new UnsupportedOperationException(
+                    "Unsupported datalake format for key decoding: " + 
lakeFormat);
+        }
+        if (kvFormatVersion == 2) {
+            // use CompactedKeyEncoder to support prefix look up
+            return CompactedKeyDecoder.createKeyDecoder(rowType, keyFields);
+        }
+        throw new UnsupportedOperationException(
+                "Unsupported kv format version: " + kvFormatVersion);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java
new file mode 100644
index 000000000..dd746e986
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode.iceberg;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.decode.KeyDecoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.util.List;
+
+import static org.apache.fluss.types.DataTypeChecks.getPrecision;
+import static org.apache.fluss.types.DataTypeChecks.getScale;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Decoder for Iceberg-encoded key bytes.
+ *
+ * <p>This decoder reverses the encoding performed by {@link
+ * org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder}.
+ *
+ * <p>The binary format follows Iceberg's encoding strategy:
+ *
+ * <ul>
+ *   <li>Only single key field is supported
+ *   <li>Integer types are encoded as long (8 bytes) in LITTLE_ENDIAN
+ *   <li>Time is encoded as microseconds (long, 8 bytes)
+ *   <li>Timestamp is encoded as microseconds (long, 8 bytes)
+ *   <li>Strings/bytes/decimals have NO length prefix (direct binary data)
+ *   <li>Decimals use BIG_ENDIAN byte order
+ * </ul>
+ */
+public class IcebergKeyDecoder implements KeyDecoder {
+
+    private final FieldReader fieldReader;
+
+    public IcebergKeyDecoder(RowType rowType, List<String> keys) {
+        checkArgument(
+                keys.size() == 1,
+                "Key fields must have exactly one field for iceberg format, 
but got: %s",
+                keys);
+
+        int keyIndex = rowType.getFieldIndex(keys.get(0));
+        DataType keyDataType = rowType.getTypeAt(keyIndex);
+        this.fieldReader = createFieldReader(keyDataType);
+    }
+
+    @Override
+    public InternalRow decodeKey(byte[] keyBytes) {
+        MemorySegment segment = MemorySegment.wrap(keyBytes);
+        GenericRow row = new GenericRow(1);
+        Object value = fieldReader.readField(segment, 0);
+        row.setField(0, value);
+        return row;
+    }
+
+    private FieldReader createFieldReader(DataType fieldType) {
+        switch (fieldType.getTypeRoot()) {
+            case INTEGER:
+            case DATE:
+                // Integer is encoded as long (8 bytes)
+                return (segment, offset) -> (int) segment.getLong(offset);
+
+            case TIME_WITHOUT_TIME_ZONE:
+                // Time encoded as microseconds long
+                return (segment, offset) -> {
+                    long micros = segment.getLong(offset);
+                    return (int) (micros / 1000L);
+                };
+
+            case BIGINT:
+                return MemorySegment::getLong;
+
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return (segment, offset) -> {
+                    long micros = segment.getLong(offset);
+                    long millis = micros / 1000L;
+                    int nanoOfMillis = (int) ((micros % 1000L) * 1000L);
+                    return TimestampNtz.fromMillis(millis, nanoOfMillis);
+                };
+
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                return (segment, offset) -> {
+                    // Decimal bytes are written directly without length prefix
+                    int length = segment.size() - offset;
+                    byte[] bytes = new byte[length];
+                    segment.get(offset, bytes, 0, length);
+                    return Decimal.fromUnscaledBytes(bytes, decimalPrecision, 
decimalScale);
+                };
+
+            case STRING:
+            case CHAR:
+                return (segment, offset) -> {
+                    // String bytes are written directly without length prefix
+                    int length = segment.size() - offset;
+                    byte[] bytes = new byte[length];
+                    segment.get(offset, bytes, 0, length);
+                    return BinaryString.fromBytes(bytes);
+                };
+
+            case BINARY:
+            case BYTES:
+                return (segment, offset) -> {
+                    // Binary bytes are written directly without length prefix
+                    int length = segment.size() - offset;
+                    byte[] bytes = new byte[length];
+                    segment.get(offset, bytes, 0, length);
+                    return bytes;
+                };
+
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported type for Iceberg key decoder: " + 
fieldType);
+        }
+    }
+
+    /** Accessor for reading fields from Iceberg binary format. */
+    interface FieldReader {
+        Object readField(MemorySegment segment, int offset);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java
new file mode 100644
index 000000000..f9a319022
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode.paimon;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.decode.KeyDecoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.util.List;
+
+import static org.apache.fluss.types.DataTypeChecks.getPrecision;
+import static org.apache.fluss.types.DataTypeChecks.getScale;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * An implementation of {@link KeyDecoder} to decode keys following Paimon's 
encoding strategy.
+ *
+ * <p>This decoder reverses the encoding performed by {@link
+ * org.apache.fluss.row.encode.paimon.PaimonKeyEncoder}.
+ *
+ * <p>The binary format follows Paimon's BinaryRow layout:
+ *
+ * <pre>
+ * Fixed-length part:
+ * - 1 byte header (RowKind)
+ * - Null bit set (aligned to 8-byte boundaries)
+ * - Field values (8 bytes each):
+ *   - For fixed-length types: actual values
+ *   - For variable-length types: offset (4 bytes) + length (4 bytes)
+ *
+ * Variable-length part:
+ * - Actual data for variable-length fields (strings, bytes, etc.)
+ * </pre>
+ */
+public class PaimonKeyDecoder implements KeyDecoder {
+
+    private static final int HEADER_SIZE_IN_BITS = 8;
+
+    private final DataType[] keyDataTypes;
+    private final FieldReader[] fieldReaders;
+    private final int nullBitsSizeInBytes;
+
+    public PaimonKeyDecoder(RowType rowType, List<String> keys) {
+        this.keyDataTypes = new DataType[keys.size()];
+        this.fieldReaders = new FieldReader[keys.size()];
+        this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(keys.size());
+        for (int i = 0; i < keys.size(); i++) {
+            int keyIndex = rowType.getFieldIndex(keys.get(i));
+            this.keyDataTypes[i] = rowType.getTypeAt(keyIndex);
+            this.fieldReaders[i] = createFieldReader(keyDataTypes[i], i);
+        }
+    }
+
+    @Override
+    public InternalRow decodeKey(byte[] keyBytes) {
+        MemorySegment segment = MemorySegment.wrap(keyBytes);
+        GenericRow row = new GenericRow(keyDataTypes.length);
+
+        for (int i = 0; i < keyDataTypes.length; i++) {
+            if (isNullAt(segment, i)) {
+                row.setField(i, null);
+            } else {
+                Object value = fieldReaders[i].readField(segment, 0);
+                row.setField(i, value);
+            }
+        }
+
+        return row;
+    }
+
+    private boolean isNullAt(MemorySegment segment, int pos) {
+        return BinarySegmentUtils.bitGet(segment, 0, pos + 
HEADER_SIZE_IN_BITS);
+    }
+
+    private int getFieldOffset(int pos) {
+        return nullBitsSizeInBytes + pos * 8;
+    }
+
+    private static int calculateBitSetWidthInBytes(int arity) {
+        return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+    }
+
+    private FieldReader createFieldReader(DataType fieldType, int pos) {
+        final int fieldOffset = getFieldOffset(pos);
+
+        switch (fieldType.getTypeRoot()) {
+            case BOOLEAN:
+                return (segment, baseOffset) -> segment.getBoolean(baseOffset 
+ fieldOffset);
+            case TINYINT:
+                return (segment, baseOffset) -> segment.get(baseOffset + 
fieldOffset);
+            case SMALLINT:
+                return (segment, baseOffset) -> segment.getShort(baseOffset + 
fieldOffset);
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return (segment, baseOffset) -> segment.getInt(baseOffset + 
fieldOffset);
+            case BIGINT:
+                return (segment, baseOffset) -> segment.getLong(baseOffset + 
fieldOffset);
+            case FLOAT:
+                return (segment, baseOffset) -> segment.getFloat(baseOffset + 
fieldOffset);
+            case DOUBLE:
+                return (segment, baseOffset) -> segment.getDouble(baseOffset + 
fieldOffset);
+            case CHAR:
+            case STRING:
+                return (segment, baseOffset) -> readString(segment, 
baseOffset, fieldOffset);
+            case BINARY:
+            case BYTES:
+                return (segment, baseOffset) -> readBinary(segment, 
baseOffset, fieldOffset);
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                return (segment, baseOffset) ->
+                        readDecimal(
+                                segment, baseOffset, fieldOffset, 
decimalPrecision, decimalScale);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final int timestampNtzPrecision = getPrecision(fieldType);
+                return (segment, baseOffset) ->
+                        readTimestampNtz(segment, baseOffset, fieldOffset, 
timestampNtzPrecision);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampLtzPrecision = getPrecision(fieldType);
+                return (segment, baseOffset) ->
+                        readTimestampLtz(segment, baseOffset, fieldOffset, 
timestampLtzPrecision);
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported type for Paimon key decoder: " + 
fieldType);
+        }
+    }
+
+    private BinaryString readString(MemorySegment segment, int baseOffset, int 
fieldOffset) {
+        final long offsetAndLen = segment.getLong(baseOffset + fieldOffset);
+        return BinarySegmentUtils.readBinaryString(
+                new MemorySegment[] {segment}, 0, fieldOffset, offsetAndLen);
+    }
+
+    private byte[] readBinary(MemorySegment segment, int baseOffset, int 
fieldOffset) {
+        final long offsetAndLen = segment.getLong(baseOffset + fieldOffset);
+        return BinarySegmentUtils.readBinary(
+                new MemorySegment[] {segment}, 0, fieldOffset, offsetAndLen);
+    }
+
+    private Decimal readDecimal(
+            MemorySegment segment, int baseOffset, int fieldOffset, int 
precision, int scale) {
+        if (Decimal.isCompact(precision)) {
+            return Decimal.fromUnscaledLong(
+                    segment.getLong(baseOffset + fieldOffset), precision, 
scale);
+        }
+
+        final long offsetAndSize = segment.getLong(baseOffset + fieldOffset);
+        return BinarySegmentUtils.readDecimal(
+                new MemorySegment[] {segment}, 0, offsetAndSize, precision, 
scale);
+    }
+
+    private TimestampNtz readTimestampNtz(
+            MemorySegment segment, int baseOffset, int fieldOffset, int 
precision) {
+        if (TimestampNtz.isCompact(precision)) {
+            return TimestampNtz.fromMillis(segment.getLong(baseOffset + 
fieldOffset));
+        }
+
+        final long offsetAndNanoOfMilli = segment.getLong(baseOffset + 
fieldOffset);
+        final int nanoOfMillisecond = (int) offsetAndNanoOfMilli;
+        final int subOffset = (int) (offsetAndNanoOfMilli >> 32);
+        final long millisecond = segment.getLong(subOffset);
+        return TimestampNtz.fromMillis(millisecond, nanoOfMillisecond);
+    }
+
+    private TimestampLtz readTimestampLtz(
+            MemorySegment segment, int baseOffset, int fieldOffset, int 
precision) {
+        if (TimestampLtz.isCompact(precision)) {
+            return TimestampLtz.fromEpochMillis(segment.getLong(baseOffset + 
fieldOffset));
+        }
+
+        final long offsetAndNanoOfMilli = segment.getLong(baseOffset + 
fieldOffset);
+        final int nanoOfMillisecond = (int) offsetAndNanoOfMilli;
+        final int subOffset = (int) (offsetAndNanoOfMilli >> 32);
+        final long millisecond = segment.getLong(subOffset);
+        return TimestampLtz.fromEpochMillis(millisecond, nanoOfMillisecond);
+    }
+
+    /** Accessor for reading fields from Paimon binary format. */
+    interface FieldReader {
+        Object readField(MemorySegment segment, int baseOffset);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
index d4f098f9b..80bc37121 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
@@ -227,6 +227,7 @@ class PaimonBinaryRowWriter {
                 segment.putLong(cursor, value.getEpochMillisecond());
                 setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
             }
+            cursor += 8;
         }
     }
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java 
b/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java
new file mode 100644
index 000000000..88d0ada50
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KeyRecordBatch}. */
+class KeyRecordBatchTest extends KvTestBase {
+
+    @Test
+    void testSingleKeyField() {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("a")
+                        .build();
+
+        CompactedKeyEncoder keyEncoder =
+                new CompactedKeyEncoder(schema.getRowType(), new int[] {0});
+        byte[] key1 = keyEncoder.encodeKey(row(1));
+        byte[] key2 = keyEncoder.encodeKey(row(2));
+
+        KeyRecordBatch batch =
+                new KeyRecordBatch(
+                        Arrays.asList(key1, key2), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        KvRecordBatch.ReadContext context = createContext(schema);
+        Iterator<KvRecord> iterator = batch.records(context).iterator();
+
+        BinaryRow row1 = iterator.next().getRow();
+        assertThat(row1.getInt(0)).isEqualTo(1);
+        assertThat(row1.isNullAt(1)).isTrue();
+        assertThat(row1.isNullAt(2)).isTrue();
+
+        BinaryRow row2 = iterator.next().getRow();
+        assertThat(row2.getInt(0)).isEqualTo(2);
+        assertThat(row2.isNullAt(1)).isTrue();
+        assertThat(row2.isNullAt(2)).isTrue();
+
+        assertThat(iterator.hasNext()).isFalse();
+    }
+
+    @Test
+    void testMultipleKeyFields() {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.BIGINT())
+                        .column("d", DataTypes.DOUBLE())
+                        .primaryKey("a", "b", "c")
+                        .build();
+
+        CompactedKeyEncoder keyEncoder =
+                new CompactedKeyEncoder(schema.getRowType(), new int[] {0, 1, 
2});
+        byte[] key1 = keyEncoder.encodeKey(row(100, "key1", 1000L));
+        byte[] key2 = keyEncoder.encodeKey(row(200, "key2", 2000L));
+
+        KeyRecordBatch batch =
+                new KeyRecordBatch(
+                        Arrays.asList(key1, key2), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        KvRecordBatch.ReadContext context = createContext(schema);
+        Iterator<KvRecord> iterator = batch.records(context).iterator();
+
+        BinaryRow row1 = iterator.next().getRow();
+        assertThat(row1.getInt(0)).isEqualTo(100);
+        assertThat(row1.getString(1).toString()).isEqualTo("key1");
+        assertThat(row1.getLong(2)).isEqualTo(1000L);
+        assertThat(row1.isNullAt(3)).isTrue();
+
+        BinaryRow row2 = iterator.next().getRow();
+        assertThat(row2.getInt(0)).isEqualTo(200);
+        assertThat(row2.getString(1).toString()).isEqualTo("key2");
+        assertThat(row2.getLong(2)).isEqualTo(2000L);
+        assertThat(row2.isNullAt(3)).isTrue();
+
+        assertThat(iterator.hasNext()).isFalse();
+    }
+
+    @Test
+    void testNonSequentialKeyFields() {
+        // Key at end: pk=(c), fields=(a, b, c)
+        Schema schema1 =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("c")
+                        .build();
+        CompactedKeyEncoder encoder1 = new 
CompactedKeyEncoder(schema1.getRowType(), new int[] {2});
+        byte[] key1 = encoder1.encodeKey(row(schema1.getRowType(), null, null, 
1000L));
+        KeyRecordBatch batch1 =
+                new KeyRecordBatch(Arrays.asList(key1), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        BinaryRow row1 = 
batch1.records(createContext(schema1)).iterator().next().getRow();
+        assertThat(row1.isNullAt(0)).isTrue();
+        assertThat(row1.isNullAt(1)).isTrue();
+        assertThat(row1.getLong(2)).isEqualTo(1000L);
+
+        // Non-consecutive: pk=(b, d), fields=(a, b, c, d, e)
+        Schema schema2 =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.BIGINT())
+                        .column("d", DataTypes.DOUBLE())
+                        .column("e", DataTypes.BOOLEAN())
+                        .primaryKey("b", "d")
+                        .build();
+        CompactedKeyEncoder encoder2 =
+                new CompactedKeyEncoder(schema2.getRowType(), new int[] {1, 
3});
+        byte[] key2 = encoder2.encodeKey(row(schema2.getRowType(), null, 
"key1", null, 1.5, null));
+        KeyRecordBatch batch2 =
+                new KeyRecordBatch(Arrays.asList(key2), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        BinaryRow row2 = 
batch2.records(createContext(schema2)).iterator().next().getRow();
+        assertThat(row2.isNullAt(0)).isTrue();
+        assertThat(row2.getString(1).toString()).isEqualTo("key1");
+        assertThat(row2.isNullAt(2)).isTrue();
+        assertThat(row2.getDouble(3)).isEqualTo(1.5);
+        assertThat(row2.isNullAt(4)).isTrue();
+
+        // Complex ordering: pk=(d, a, c), fields=(a, b, c, d, e, f)
+        Schema schema3 =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.BIGINT())
+                        .column("d", DataTypes.DOUBLE())
+                        .column("e", DataTypes.BOOLEAN())
+                        .column("f", DataTypes.FLOAT())
+                        .primaryKey("d", "a", "c")
+                        .build();
+        CompactedKeyEncoder encoder3 =
+                new CompactedKeyEncoder(schema3.getRowType(), new int[] {3, 0, 
2});
+        byte[] key3 = encoder3.encodeKey(row(100, null, 1000L, 1.5, null, 
null));
+        KeyRecordBatch batch3 =
+                new KeyRecordBatch(Arrays.asList(key3), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        BinaryRow row3 = 
batch3.records(createContext(schema3)).iterator().next().getRow();
+        assertThat(row3.getInt(0)).isEqualTo(100);
+        assertThat(row3.isNullAt(1)).isTrue();
+        assertThat(row3.getLong(2)).isEqualTo(1000L);
+        assertThat(row3.getDouble(3)).isEqualTo(1.5);
+        assertThat(row3.isNullAt(4)).isTrue();
+        assertThat(row3.isNullAt(5)).isTrue();
+    }
+
+    @Test
+    void testEmptyBatch() {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+
+        KeyRecordBatch batch =
+                new KeyRecordBatch(Arrays.asList(), KvFormat.COMPACTED, 
DEFAULT_SCHEMA_ID);
+        assertThat(batch.getRecordCount()).isEqualTo(0);
+        
assertThat(batch.records(createContext(schema)).iterator().hasNext()).isFalse();
+    }
+
+    private KvRecordBatch.ReadContext createContext(Schema schema) {
+        return KvRecordReadContext.createReadContext(
+                KvFormat.COMPACTED, new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
schema));
+    }
+}
diff --git a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java 
b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
index 874377125..b67e6443e 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
@@ -216,6 +216,34 @@ public final class TestData {
                     .withComment("b is second column")
                     .primaryKey("a")
                     .build();
+
+    // DATA3 with auto-increment column for testing lookup-insert-if-not-exists
+    public static final RowType DATA3_ROW_TYPE =
+            DataTypes.ROW(
+                    new DataField("a", DataTypes.INT()),
+                    new DataField("b", DataTypes.STRING()),
+                    new DataField("c", DataTypes.BIGINT()));
+    public static final RowType DATA3_KEY_TYPE = DataTypes.ROW(new 
DataField("a", DataTypes.INT()));
+    public static final Schema DATA3_SCHEMA_PK_AUTO_INC =
+            Schema.newBuilder()
+                    .column("a", DataTypes.INT())
+                    .withComment("a is primary key column")
+                    .column("b", DataTypes.STRING())
+                    .withComment("b is regular column")
+                    .column("c", DataTypes.BIGINT())
+                    .withComment("c is auto-increment column")
+                    .primaryKey("a")
+                    .enableAutoIncrement("c")
+                    .build();
+    public static final TablePath DATA3_TABLE_PATH_PK_AUTO_INC =
+            TablePath.of("test_db_3", "test_pk_table_auto_inc");
+    public static final long DATA3_TABLE_ID_PK_AUTO_INC = 150004L;
+    public static final TableDescriptor DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC =
+            TableDescriptor.builder()
+                    .schema(DATA3_SCHEMA_PK_AUTO_INC)
+                    .distributedBy(3, "a")
+                    .build();
+
     // ---------------------------- data3 table info end 
------------------------------
 
     public static final TestingSchemaGetter TEST_SCHEMA_GETTER =
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java
new file mode 100644
index 000000000..529fb4c57
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link CompactedKeyDecoder}. */
+class CompactedKeyDecoderTest {
+
+    @Test
+    void testDecodeKey() {
+        // All fields as keys
+        verifyDecode(
+                RowType.of(DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.INT()),
+                new int[] {0, 1, 2},
+                row(1, 3L, 2),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(3);
+                    assertThat(decoded.getInt(0)).isEqualTo(1);
+                    assertThat(decoded.getLong(1)).isEqualTo(3L);
+                    assertThat(decoded.getInt(2)).isEqualTo(2);
+                });
+
+        // Partial key - single INT key
+        RowType type1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()},
+                        new String[] {"id", "name", "value"});
+        verifyDecode(
+                type1,
+                Collections.singletonList("id"),
+                row(100, "Alice", 999L),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(1);
+                    assertThat(decoded.getInt(0)).isEqualTo(100);
+                });
+
+        // Single STRING key at non-first position
+        RowType type2 =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.BIGINT(), DataTypes.STRING()},
+                        new String[] {"partition", "f1", "f2"});
+        verifyDecode(
+                type2,
+                Collections.singletonList("f2"),
+                row("p1", 1L, "a2"),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(1);
+                    
assertThat(decoded.getString(0).toString()).isEqualTo("a2");
+                });
+
+        // Multiple keys (INT, STRING)
+        RowType type3 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.BIGINT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.BOOLEAN()
+                        },
+                        new String[] {"id", "name", "age", "score", "active"});
+        verifyDecode(
+                type3,
+                Arrays.asList("id", "name"),
+                row(100, "Alice", 25L, 95.5, true),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(2);
+                    assertThat(decoded.getInt(0)).isEqualTo(100);
+                    
assertThat(decoded.getString(1).toString()).isEqualTo("Alice");
+                });
+
+        // Non-sequential key positions
+        RowType type4 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.BIGINT(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"a", "b", "c", "d", "e"});
+        verifyDecode(
+                type4,
+                Arrays.asList("b", "d"),
+                row("v0", 42, "v2", 999L, "v4"),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(2);
+                    assertThat(decoded.getInt(0)).isEqualTo(42);
+                    assertThat(decoded.getLong(1)).isEqualTo(999L);
+                });
+
+        // Various data types at non-sequential positions
+        RowType type5 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BOOLEAN(),
+                            DataTypes.TINYINT(),
+                            DataTypes.SMALLINT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"a", "b", "c", "d", "e", "f", "g", "h"});
+        verifyDecode(
+                type5,
+                Arrays.asList("b", "d", "g"),
+                row(true, (byte) 1, (short) 2, 3, 4L, 5.0f, 6.0, 
BinaryString.fromString("test")),
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(3);
+                    assertThat(decoded.getByte(0)).isEqualTo((byte) 1);
+                    assertThat(decoded.getInt(1)).isEqualTo(3);
+                    assertThat(decoded.getDouble(2)).isEqualTo(6.0);
+                });
+    }
+
+    @Test
+    void testEncodeDecodeRoundTrip() {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()},
+                        new String[] {"id", "name", "value"});
+        List<String> pk = Arrays.asList("id", "name");
+        CompactedKeyEncoder encoder = 
CompactedKeyEncoder.createKeyEncoder(rowType, pk);
+        CompactedKeyDecoder decoder = 
CompactedKeyDecoder.createKeyDecoder(rowType, pk);
+
+        for (InternalRow original :
+                new InternalRow[] {
+                    row(1, "alice", 100L),
+                    row(2, "bob", 200L),
+                    row(999, "test", 999L),
+                    row(0, "", 0L)
+                }) {
+            InternalRow decoded = 
decoder.decodeKey(encoder.encodeKey(original));
+            assertThat(decoded.getInt(0)).isEqualTo(original.getInt(0));
+            
assertThat(decoded.getString(1).toString()).isEqualTo(original.getString(1).toString());
+        }
+    }
+
+    @Test
+    void testDecodeInvalidKey() {
+        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+        assertThatThrownBy(
+                        () ->
+                                CompactedKeyDecoder.createKeyDecoder(
+                                        rowType, 
Collections.singletonList("invalidField")))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("invalidField not found in row type");
+    }
+
+    @Test
+    void testAllSupportedPrimaryKeyTypes() {
+        // Test all supported primary key data types
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BOOLEAN(),
+                            DataTypes.TINYINT(),
+                            DataTypes.SMALLINT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.STRING(),
+                            DataTypes.CHAR(10),
+                            DataTypes.BINARY(5),
+                            DataTypes.BYTES(),
+                            DataTypes.DATE(),
+                            DataTypes.TIME(),
+                            DataTypes.TIMESTAMP(),
+                            DataTypes.TIMESTAMP_LTZ(),
+                            DataTypes.DECIMAL(10, 2)
+                        },
+                        new String[] {
+                            "f_boolean",
+                            "f_tinyint",
+                            "f_smallint",
+                            "f_int",
+                            "f_bigint",
+                            "f_float",
+                            "f_double",
+                            "f_string",
+                            "f_char",
+                            "f_binary",
+                            "f_bytes",
+                            "f_date",
+                            "f_time",
+                            "f_timestamp",
+                            "f_timestamp_ltz",
+                            "f_decimal"
+                        });
+
+        List<String> allPrimaryKeys =
+                Arrays.asList(
+                        "f_boolean",
+                        "f_tinyint",
+                        "f_smallint",
+                        "f_int",
+                        "f_bigint",
+                        "f_float",
+                        "f_double",
+                        "f_string",
+                        "f_char",
+                        "f_binary",
+                        "f_bytes",
+                        "f_date",
+                        "f_time",
+                        "f_timestamp",
+                        "f_timestamp_ltz",
+                        "f_decimal");
+
+        // Test data values
+        InternalRow testRow =
+                row(
+                        true, // BOOLEAN
+                        (byte) 127, // TINYINT
+                        (short) 32767, // SMALLINT
+                        2147483647, // INT
+                        9223372036854775807L, // BIGINT
+                        3.14f, // FLOAT
+                        2.718281828, // DOUBLE
+                        BinaryString.fromString("test_string"), // STRING
+                        BinaryString.fromString("test_char"), // CHAR
+                        new byte[] {1, 2, 3, 4, 5}, // BINARY
+                        new byte[] {10, 20, 30}, // BYTES
+                        18000, // DATE (days since epoch)
+                        3600000, // TIME (milliseconds since midnight)
+                        TimestampNtz.fromMillis(1609459200000L), // TIMESTAMP
+                        TimestampLtz.fromEpochMillis(1609459200000L), // 
TIMESTAMP_LTZ
+                        Decimal.fromBigDecimal(
+                                new java.math.BigDecimal("12345678.90"), 10, 
2) // DECIMAL
+                        );
+
+        verifyDecode(
+                rowType,
+                allPrimaryKeys,
+                testRow,
+                (decoded, original) -> {
+                    assertThat(decoded.getFieldCount()).isEqualTo(16);
+                    assertThat(decoded.getBoolean(0)).isEqualTo(true);
+                    assertThat(decoded.getByte(1)).isEqualTo((byte) 127);
+                    assertThat(decoded.getShort(2)).isEqualTo((short) 32767);
+                    assertThat(decoded.getInt(3)).isEqualTo(2147483647);
+                    
assertThat(decoded.getLong(4)).isEqualTo(9223372036854775807L);
+                    assertThat(decoded.getFloat(5)).isEqualTo(3.14f);
+                    assertThat(decoded.getDouble(6)).isEqualTo(2.718281828);
+                    
assertThat(decoded.getString(7).toString()).isEqualTo("test_string");
+                    assertThat(decoded.getChar(8, 
10).toString()).isEqualTo("test_char");
+                    assertThat(decoded.getBinary(9, 5)).isEqualTo(new byte[] 
{1, 2, 3, 4, 5});
+                    assertThat(decoded.getBytes(10)).isEqualTo(new byte[] {10, 
20, 30});
+                    assertThat(decoded.getInt(11)).isEqualTo(18000);
+                    assertThat(decoded.getInt(12)).isEqualTo(3600000);
+                    assertThat(decoded.getTimestampNtz(13, 6).getMillisecond())
+                            .isEqualTo(1609459200000L);
+                    assertThat(decoded.getTimestampLtz(14, 
6).getEpochMillisecond())
+                            .isEqualTo(1609459200000L);
+                    assertThat(decoded.getDecimal(15, 10, 2).toBigDecimal())
+                            .isEqualTo(new 
java.math.BigDecimal("12345678.90"));
+                });
+    }
+
+    private void verifyDecode(
+            RowType rowType,
+            int[] keyPos,
+            InternalRow original,
+            BiConsumer<InternalRow, InternalRow> assertions) {
+        CompactedKeyEncoder encoder = new CompactedKeyEncoder(rowType, keyPos);
+        CompactedKeyDecoder decoder = new CompactedKeyDecoder(rowType, keyPos);
+        assertions.accept(decoder.decodeKey(encoder.encodeKey(original)), 
original);
+    }
+
+    private void verifyDecode(
+            RowType rowType,
+            List<String> keys,
+            InternalRow original,
+            BiConsumer<InternalRow, InternalRow> assertions) {
+        CompactedKeyEncoder encoder = 
CompactedKeyEncoder.createKeyEncoder(rowType, keys);
+        CompactedKeyDecoder decoder = 
CompactedKeyDecoder.createKeyDecoder(rowType, keys);
+        assertions.accept(decoder.decodeKey(encoder.encodeKey(original)), 
original);
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java
new file mode 100644
index 000000000..49b384375
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode.iceberg;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link IcebergKeyDecoder} to verify decoding correctness and 
round-trip with
+ * encoder.
+ */
+class IcebergKeyDecoderTest {
+
+    @Test
+    void testSingleKeyFieldRequirement() {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+                        new String[] {"id", "name"});
+
+        // Should succeed with single key
+        IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, 
Collections.singletonList("id"));
+        assertThat(decoder).isNotNull();
+
+        // Should fail with multiple keys
+        assertThatThrownBy(() -> new IcebergKeyDecoder(rowType, 
Arrays.asList("id", "name")))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Key fields must have exactly one 
field");
+    }
+
+    @Test
+    void testDecodeInteger() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new 
String[] {"id"});
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+        IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, 
Collections.singletonList("id"));
+
+        for (int value : new int[] {0, -42, 42, Integer.MAX_VALUE, 
Integer.MIN_VALUE}) {
+            InternalRow original = row(value);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getInt(0)).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeLong() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new 
String[] {"id"});
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("id"));
+        IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, 
Collections.singletonList("id"));
+
+        for (long value : new long[] {0L, -999L, 1234567890123456789L, 
Long.MAX_VALUE}) {
+            InternalRow original = row(value);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getLong(0)).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeDate() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new 
String[] {"date"});
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("date"));
+        IcebergKeyDecoder decoder =
+                new IcebergKeyDecoder(rowType, 
Collections.singletonList("date"));
+
+        for (int value : new int[] {0, 19655}) {
+            InternalRow original = row(value);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getInt(0)).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeTime() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new 
String[] {"time"});
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("time"));
+        IcebergKeyDecoder decoder =
+                new IcebergKeyDecoder(rowType, 
Collections.singletonList("time"));
+
+        for (int value : new int[] {0, 34200000, 86399999}) {
+            InternalRow original = row(value);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getInt(0)).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeTimestamp() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, 
new String[] {"ts"});
+        IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, 
Collections.singletonList("ts"));
+        IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, 
Collections.singletonList("ts"));
+
+        // Iceberg uses microsecond precision, so only test values that are 
multiples of 1000 nanos
+        long[] millisValues = {0L, 1000L, 1698235273182L};
+        int[] nanosValues = {
+            0, 0, 123000, 999000
+        }; // Must be multiples of 1000 for microsecond precision
+
+        for (int i = 0; i < millisValues.length; i++) {
+            InternalRow original =
+                    row((Object) TimestampNtz.fromMillis(millisValues[i], 
nanosValues[i]));
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            TimestampNtz ts = decoded.getTimestampNtz(0, 6);
+            assertThat(ts.getMillisecond()).isEqualTo(millisValues[i]);
+            assertThat(ts.getNanoOfMillisecond()).isEqualTo(nanosValues[i]);
+        }
+    }
+
+    @Test
+    void testDecodeString() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new 
String[] {"name"});
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("name"));
+        IcebergKeyDecoder decoder =
+                new IcebergKeyDecoder(rowType, 
Collections.singletonList("name"));
+
+        for (String value : new String[] {"", "a", "Hello", "Hello Iceberg!", 
"UTF-8: 你好世界"}) {
+            InternalRow original = row((Object) 
BinaryString.fromString(value));
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getString(0).toString()).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeBinary() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new 
String[] {"data"});
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("data"));
+        IcebergKeyDecoder decoder =
+                new IcebergKeyDecoder(rowType, 
Collections.singletonList("data"));
+
+        for (byte[] value :
+                new byte[][] {new byte[0], "test".getBytes(), new byte[] {0, 
1, 127, -128, -1}}) {
+            InternalRow original = row((Object) value);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getBytes(0)).isEqualTo(value);
+        }
+    }
+
+    @Test
+    void testDecodeDecimal() {
+        RowType rowType =
+                RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new 
String[] {"amount"});
+        IcebergKeyEncoder encoder =
+                new IcebergKeyEncoder(rowType, 
Collections.singletonList("amount"));
+        IcebergKeyDecoder decoder =
+                new IcebergKeyDecoder(rowType, 
Collections.singletonList("amount"));
+
+        for (String decimalStr : new String[] {"0.00", "123.45", "-999.99", 
"12345678.90"}) {
+            BigDecimal value = new BigDecimal(decimalStr);
+            Decimal decimal = Decimal.fromBigDecimal(value, 10, 2);
+            InternalRow original = row((Object) decimal);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getDecimal(0, 10, 
2).toBigDecimal()).isEqualTo(value);
+        }
+
+        // Test high precision decimal
+        RowType highPrecisionRowType =
+                RowType.of(new DataType[] {DataTypes.DECIMAL(38, 10)}, new 
String[] {"big_amount"});
+        IcebergKeyEncoder highPrecisionEncoder =
+                new IcebergKeyEncoder(
+                        highPrecisionRowType, 
Collections.singletonList("big_amount"));
+        IcebergKeyDecoder highPrecisionDecoder =
+                new IcebergKeyDecoder(
+                        highPrecisionRowType, 
Collections.singletonList("big_amount"));
+        BigDecimal highValue = new 
BigDecimal("1234567890123456789012345678.1234567890");
+        Decimal highDecimal = Decimal.fromBigDecimal(highValue, 38, 10);
+        InternalRow highOriginal = row((Object) highDecimal);
+        byte[] encoded = highPrecisionEncoder.encodeKey(highOriginal);
+        InternalRow decoded = highPrecisionDecoder.decodeKey(encoded);
+        assertThat(decoded.getDecimal(0, 38, 
10).toBigDecimal()).isEqualTo(highValue);
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
new file mode 100644
index 000000000..6066d827f
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode.paimon;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link PaimonKeyDecoder} to verify decoding correctness and 
round-trip. */
+class PaimonKeyDecoderTest {
+
+    @Test
+    void testDecodeAllTypes() {
+        // Create a row type with all supported types as primary key fields
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BOOLEAN(),
+                            DataTypes.TINYINT(),
+                            DataTypes.SMALLINT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.DATE(),
+                            DataTypes.TIME(),
+                            DataTypes.CHAR(5),
+                            DataTypes.STRING(),
+                            DataTypes.BINARY(10),
+                            DataTypes.BYTES(),
+                            DataTypes.DECIMAL(5, 2),
+                            DataTypes.DECIMAL(20, 0),
+                            DataTypes.TIMESTAMP(1),
+                            DataTypes.TIMESTAMP(6),
+                            DataTypes.TIMESTAMP_LTZ(1),
+                            DataTypes.TIMESTAMP_LTZ(6)
+                        },
+                        new String[] {
+                            "bool_col",
+                            "byte_col",
+                            "short_col",
+                            "int_col",
+                            "long_col",
+                            "float_col",
+                            "double_col",
+                            "date_col",
+                            "time_col",
+                            "char_col",
+                            "string_col",
+                            "binary_col",
+                            "bytes_col",
+                            "compact_decimal_col",
+                            "non_compact_decimal_col",
+                            "compact_ts_col",
+                            "non_compact_ts_col",
+                            "compact_ltz_col",
+                            "non_compact_ltz_col"
+                        });
+
+        List<String> keys = rowType.getFieldNames();
+        PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, keys);
+        PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, keys);
+
+        long millis = 1698235273182L;
+        int nanos = 456789;
+
+        // Create test row with all types
+        InternalRow original = createAllTypesRow(millis, nanos);
+
+        // Encode and decode
+        byte[] encoded = encoder.encodeKey(original);
+        InternalRow decoded = decoder.decodeKey(encoded);
+
+        // Verify all fields
+        assertThat(decoded.getFieldCount()).isEqualTo(19);
+        assertThat(decoded.getBoolean(0)).isTrue();
+        assertThat(decoded.getByte(1)).isEqualTo((byte) 127);
+        assertThat(decoded.getShort(2)).isEqualTo((short) 32767);
+        assertThat(decoded.getInt(3)).isEqualTo(Integer.MAX_VALUE);
+        assertThat(decoded.getLong(4)).isEqualTo(Long.MAX_VALUE);
+        assertThat(decoded.getFloat(5))
+                .isCloseTo(3.14f, org.assertj.core.data.Offset.offset(0.01f));
+        assertThat(decoded.getDouble(6))
+                .isCloseTo(2.718, org.assertj.core.data.Offset.offset(0.001));
+        assertThat(decoded.getInt(7)).isEqualTo(19655);
+        assertThat(decoded.getInt(8)).isEqualTo(34200000);
+        assertThat(decoded.getString(9).toString()).isEqualTo("hello");
+        assertThat(decoded.getString(10).toString()).isEqualTo("world");
+        assertThat(decoded.getBytes(11)).isEqualTo("test".getBytes());
+        assertThat(decoded.getBytes(12)).isEqualTo("data".getBytes());
+        assertThat(decoded.getDecimal(13, 5, 2).toBigDecimal()).isEqualTo(new 
BigDecimal("123.45"));
+        assertThat(decoded.getDecimal(14, 20, 0).toBigDecimal())
+                .isEqualTo(new BigDecimal("12345678901234567890"));
+        assertThat(decoded.getTimestampNtz(15, 
1).getMillisecond()).isEqualTo(millis);
+        TimestampNtz decodedTs = decoded.getTimestampNtz(16, 6);
+        assertThat(decodedTs.getMillisecond()).isEqualTo(millis);
+        assertThat(decodedTs.getNanoOfMillisecond()).isEqualTo(nanos);
+        assertThat(decoded.getTimestampLtz(17, 
1).getEpochMillisecond()).isEqualTo(millis);
+        TimestampLtz decodedLtz = decoded.getTimestampLtz(18, 6);
+        assertThat(decodedLtz.getEpochMillisecond()).isEqualTo(millis);
+        assertThat(decodedLtz.getNanoOfMillisecond()).isEqualTo(nanos);
+    }
+
+    @Test
+    void testDecodeStringVariants() {
+        RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new 
String[] {"s"});
+        PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, 
rowType.getFieldNames());
+        PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, 
rowType.getFieldNames());
+
+        // Test short strings (≤7 bytes with 0x80 marker) and long strings (>7 
bytes)
+        for (String testStr : Arrays.asList("", "a", "1234567", "12345678", 
"Hello, Paimon!")) {
+            InternalRow original = createStringRow(testStr);
+            byte[] encoded = encoder.encodeKey(original);
+            InternalRow decoded = decoder.decodeKey(encoded);
+            assertThat(decoded.getString(0).toString()).isEqualTo(testStr);
+        }
+    }
+
+    @Test
+    void testDecodePartialKeys() {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()},
+                        new String[] {"id", "name", "value"});
+
+        List<String> keys = Collections.singletonList("id");
+        PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, keys);
+        PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, keys);
+
+        InternalRow original = createPartialKeyRow();
+        byte[] encoded = encoder.encodeKey(original);
+        InternalRow decoded = decoder.decodeKey(encoded);
+
+        assertThat(decoded.getFieldCount()).isEqualTo(1);
+        assertThat(decoded.getInt(0)).isEqualTo(100);
+    }
+
+    private InternalRow createAllTypesRow(long millis, int nanos) {
+        DataType[] dataTypes =
+                new DataType[] {
+                    DataTypes.BOOLEAN(),
+                    DataTypes.TINYINT(),
+                    DataTypes.SMALLINT(),
+                    DataTypes.INT(),
+                    DataTypes.BIGINT(),
+                    DataTypes.FLOAT(),
+                    DataTypes.DOUBLE(),
+                    DataTypes.DATE(),
+                    DataTypes.TIME(),
+                    DataTypes.CHAR(5),
+                    DataTypes.STRING(),
+                    DataTypes.BINARY(10),
+                    DataTypes.BYTES(),
+                    DataTypes.DECIMAL(5, 2),
+                    DataTypes.DECIMAL(20, 0),
+                    DataTypes.TIMESTAMP(1),
+                    DataTypes.TIMESTAMP(6),
+                    DataTypes.TIMESTAMP_LTZ(1),
+                    DataTypes.TIMESTAMP_LTZ(6)
+                };
+        IndexedRow indexedRow = new IndexedRow(dataTypes);
+        IndexedRowWriter writer = new IndexedRowWriter(dataTypes);
+        writer.writeBoolean(true);
+        writer.writeByte((byte) 127);
+        writer.writeShort((short) 32767);
+        writer.writeInt(Integer.MAX_VALUE);
+        writer.writeLong(Long.MAX_VALUE);
+        writer.writeFloat(3.14f);
+        writer.writeDouble(2.718);
+        writer.writeInt(19655);
+        writer.writeInt(34200000);
+        writer.writeChar(BinaryString.fromString("hello"), 5);
+        writer.writeString(BinaryString.fromString("world"));
+        writer.writeBinary("test".getBytes(), 10);
+        writer.writeBytes("data".getBytes());
+        writer.writeDecimal(Decimal.fromBigDecimal(new BigDecimal("123.45"), 
5, 2), 5);
+        writer.writeDecimal(
+                Decimal.fromBigDecimal(new BigDecimal("12345678901234567890"), 
20, 0), 20);
+        writer.writeTimestampNtz(TimestampNtz.fromMillis(millis), 1);
+        writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 6);
+        writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis), 1);
+        writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis, nanos), 
6);
+        indexedRow.pointTo(writer.segment(), 0, writer.position());
+        return indexedRow;
+    }
+
+    private InternalRow createStringRow(String value) {
+        return row((Object) BinaryString.fromString(value));
+    }
+
+    private InternalRow createPartialKeyRow() {
+        return row(100, BinaryString.fromString("Alice"), 999L);
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java
index c1093c97f..e770d96de 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java
@@ -84,7 +84,7 @@ class PaimonKeyEncoderTest {
         writer.writeTimestampNtz(TimestampNtz.fromMillis(1698235273182L), 1);
         writer.writeTimestampNtz(TimestampNtz.fromMillis(1698235273182L), 5);
         writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(1698235273182L, 
45678), 1);
-        writer.setNullAt(18);
+        writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(1698235273182L, 
123456), 6);
         indexedRow.pointTo(writer.segment(), 0, writer.position());
         return indexedRow;
     }
@@ -118,7 +118,7 @@ class PaimonKeyEncoderTest {
         binaryRowWriter.writeTimestamp(15, 
Timestamp.fromEpochMillis(1698235273182L), 1);
         binaryRowWriter.writeTimestamp(16, 
Timestamp.fromEpochMillis(1698235273182L), 5);
         binaryRowWriter.writeTimestamp(17, 
Timestamp.fromEpochMillis(1698235273182L, 45678), 1);
-        binaryRowWriter.setNullAt(18);
+        binaryRowWriter.writeTimestamp(18, 
Timestamp.fromEpochMillis(1698235273182L, 123456), 6);
         binaryRowWriter.complete();
         return binaryRow;
     }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java 
b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
index 8594cf224..5ee50776a 100644
--- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
@@ -750,6 +750,20 @@ public class DataTestUtils {
         assertLogRecordsEquals(DEFAULT_SCHEMA_ID, rowType, logRecords, 
expectedValue, schemaGetter);
     }
 
+    public static void assertLogRecordsEquals(
+            RowType rowType,
+            LogRecords logRecords,
+            List<Object[]> expectedValue,
+            ChangeType expectedChangeType,
+            SchemaGetter schemaGetter) {
+        List<Tuple2<ChangeType, Object[]>> expectedValueWithRowKind =
+                expectedValue.stream()
+                        .map(val -> Tuple2.of(expectedChangeType, val))
+                        .collect(Collectors.toList());
+        assertLogRecordsEqualsWithRowKind(
+                DEFAULT_SCHEMA_ID, rowType, logRecords, 
expectedValueWithRowKind, schemaGetter);
+    }
+
     public static void assertLogRecordsEquals(
             int schemaId,
             RowType rowType,
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index dc05efd83..b85487a50 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -246,6 +246,9 @@ message PutKvResponse {
 message LookupRequest {
   required int64 table_id = 1;
   repeated PbLookupReqForBucket buckets_req = 2;
+  optional bool insert_if_not_exists = 3;
+  optional int32 acks = 4;
+  optional int32 timeout_ms = 5;
 }
 
 message LookupResponse {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0eee04c6c..0c711f4f3 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -328,6 +328,12 @@ public final class KvTablet {
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     validateSchemaId(kvRecords.schemaId(), latestSchemaId);
 
+                    AutoIncrementUpdater currentAutoIncrementUpdater =
+                            autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
+
+                    // Validate targetColumns doesn't contain auto-increment 
column
+                    validateTargetColumns(targetColumns, 
currentAutoIncrementUpdater, latestSchema);
+
                     // Determine the row merger based on mergeMode:
                     // - DEFAULT: Use the configured merge engine (rowMerger)
                     // - OVERWRITE: Bypass merge engine, use pre-created 
overwriteRowMerger
@@ -339,8 +345,6 @@ public final class KvTablet {
                                             targetColumns, latestSchemaId, 
latestSchema)
                                     : rowMerger.configureTargetColumns(
                                             targetColumns, latestSchemaId, 
latestSchema);
-                    AutoIncrementUpdater currentAutoIncrementUpdater =
-                            autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
                     RowType latestRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
@@ -407,6 +411,21 @@ public final class KvTablet {
         }
     }
 
+    private void validateTargetColumns(
+            int[] targetColumns, AutoIncrementUpdater autoIncrementUpdater, 
Schema schema) {
+        if (!autoIncrementUpdater.hasAutoIncrement() || targetColumns == null) 
{
+            return;
+        }
+        List<String> autoIncrementColumnNames = 
schema.getAutoIncrementColumnNames();
+        for (int colIdx : targetColumns) {
+            if 
(autoIncrementColumnNames.contains(schema.getColumnName(colIdx))) {
+                throw new IllegalArgumentException(
+                        "targetColumns must not include auto-increment column 
name: "
+                                + schema.getColumnName(colIdx));
+            }
+        }
+    }
+
     private void processKvRecords(
             KvRecordBatch kvRecords,
             short schemaIdOfNewData,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index a9bfde8e2..d1010918e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -34,11 +34,13 @@ import 
org.apache.fluss.exception.UnsupportedVersionException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.groups.MetricGroup;
+import org.apache.fluss.record.KeyRecordBatch;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.record.ProjectionPushdownCache;
@@ -123,6 +125,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -138,6 +141,7 @@ import java.util.stream.Stream;
 import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2;
 import static org.apache.fluss.server.TabletManagerBase.getTableInfo;
 import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.Preconditions.checkState;
 import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
@@ -580,6 +584,53 @@ public class ReplicaManager {
                 timeoutMs, requiredAcks, entriesPerBucket.size(), kvPutResult, 
responseCallback);
     }
 
+    /** Context for tracking missing keys that need to be inserted. */
+    public static class MissingKeysContext {
+        final List<Integer> missingIndexes;
+        final List<byte[]> missingKeys;
+
+        MissingKeysContext(List<Integer> missingIndexes, List<byte[]> 
missingKeys) {
+            this.missingIndexes = missingIndexes;
+            this.missingKeys = missingKeys;
+        }
+    }
+
+    /**
+     * Collect missing keys from lookup results for insertion, populating both 
context and batch
+     * maps.
+     */
+    private void collectMissingKeysForInsert(
+            Map<TableBucket, List<byte[]>> entriesPerBucket,
+            Map<TableBucket, LookupResultForBucket> lookupResults,
+            Map<TableBucket, MissingKeysContext> missingKeysContextMap,
+            Map<TableBucket, KvRecordBatch> kvRecordBatchMap) {
+        for (Map.Entry<TableBucket, List<byte[]>> entry : 
entriesPerBucket.entrySet()) {
+            TableBucket tb = entry.getKey();
+            LookupResultForBucket lookupResult = lookupResults.get(tb);
+            if (lookupResult.failed()) {
+                continue;
+            }
+            List<Integer> missingIndexes = new ArrayList<>();
+            List<byte[]> missingKeys = new ArrayList<>();
+            List<byte[]> requestedKeys = entry.getValue();
+            List<byte[]> lookupValues = lookupResult.lookupValues();
+
+            for (int i = 0; i < requestedKeys.size(); i++) {
+                if (lookupValues.get(i) == null) {
+                    missingKeys.add(requestedKeys.get(i));
+                    missingIndexes.add(i);
+                }
+            }
+            if (!missingKeys.isEmpty()) {
+                missingKeysContextMap.put(tb, new 
MissingKeysContext(missingIndexes, missingKeys));
+                kvRecordBatchMap.put(
+                        tb,
+                        KeyRecordBatch.create(
+                                missingKeys, 
getReplicaOrException(tb).getTableInfo()));
+            }
+        }
+    }
+
     /** Lookup a single key value. */
     @VisibleForTesting
     protected void lookup(TableBucket tableBucket, byte[] key, 
Consumer<byte[]> responseCallback) {
@@ -598,12 +649,22 @@ public class ReplicaManager {
                 });
     }
 
+    public void lookups(
+            Map<TableBucket, List<byte[]>> entriesPerBucket,
+            short apiVersion,
+            Consumer<Map<TableBucket, LookupResultForBucket>> 
responseCallback) {
+        lookups(false, null, null, entriesPerBucket, apiVersion, 
responseCallback);
+    }
+
     /**
      * Lookup with multi key from leader replica of the buckets.
      *
      * @param apiVersion the client API version for backward compatibility 
validation
      */
     public void lookups(
+            boolean insertIfNotExists,
+            @Nullable Integer timeoutMs,
+            @Nullable Integer requiredAcks,
             Map<TableBucket, List<byte[]>> entriesPerBucket,
             short apiVersion,
             Consumer<Map<TableBucket, LookupResultForBucket>> 
responseCallback) {
@@ -633,8 +694,96 @@ public class ReplicaManager {
                         tb, new LookupResultForBucket(tb, 
ApiError.fromThrowable(e)));
             }
         }
+
+        if (insertIfNotExists) {
+            // Lookup-with-insert-if-not-exists flow:
+            // 1. Initial lookup may find some keys missing
+            // 2. For missing keys, we call putKv to insert them
+            // 3. Concurrent lookups on the same keys may all see them as 
missing, leading to
+            //    multiple putKv calls for the same keys (becomes UPDATE 
operations in
+            // processUpsert)
+            // 4. Auto-increment columns are excluded from targetColumns to 
prevent overwriting
+            checkArgument(
+                    timeoutMs != null && requiredAcks != null,
+                    "timeoutMs and requiredAcks must be set");
+            Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert = 
new HashMap<>();
+            Map<TableBucket, KvRecordBatch> produceEntryData = new HashMap<>();
+            collectMissingKeysForInsert(
+                    entriesPerBucket,
+                    lookupResultForBucketMap,
+                    entriesPerBucketToInsert,
+                    produceEntryData);
+
+            if (!produceEntryData.isEmpty()) {
+                // Compute target columns: exclude auto-increment to prevent 
overwriting
+                TableBucket firstBucket = 
produceEntryData.keySet().iterator().next();
+                Schema schema = 
getReplicaOrException(firstBucket).getTableInfo().getSchema();
+
+                // TODO: Performance optimization: during 
lookup-with-insert-if-not-exists flow,
+                // the original key bytes are wrapped in KeyRecordBatch, then 
during putRecordsToKv
+                // they are decoded to rows and immediately re-encoded back to 
key bytes, causing
+                // redundant encode/decode overhead.
+                putRecordsToKv(
+                        timeoutMs,
+                        requiredAcks,
+                        produceEntryData,
+                        schema.getPrimaryKeyIndexes(),
+                        MergeMode.DEFAULT,
+                        apiVersion,
+                        (result) ->
+                                responseCallback.accept(
+                                        reLookupAndMerge(
+                                                result,
+                                                entriesPerBucketToInsert,
+                                                lookupResultForBucketMap)));
+            } else {
+                // All keys exist, directly return lookup results
+                responseCallback.accept(lookupResultForBucketMap);
+            }
+        } else {
+            responseCallback.accept(lookupResultForBucketMap);
+        }
         LOG.debug("Lookup from local kv in {}ms", System.currentTimeMillis() - 
startTime);
-        responseCallback.accept(lookupResultForBucketMap);
+    }
+
+    /**
+     * Re-lookup missing keys after insert and merge results back into the 
original lookup result
+     * map.
+     */
+    private Map<TableBucket, LookupResultForBucket> reLookupAndMerge(
+            List<PutKvResultForBucket> putKvResultForBucketList,
+            Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert,
+            Map<TableBucket, LookupResultForBucket> lookupResultForBucketMap) {
+        // Collect failed buckets first to avoid unnecessary re-lookup
+        Set<TableBucket> failedBuckets = new HashSet<>();
+        for (PutKvResultForBucket putKvResultForBucket : 
putKvResultForBucketList) {
+            if (putKvResultForBucket.failed()) {
+                TableBucket tb = putKvResultForBucket.getTableBucket();
+                failedBuckets.add(tb);
+                lookupResultForBucketMap.put(
+                        tb, new LookupResultForBucket(tb, 
putKvResultForBucket.getError()));
+            }
+        }
+
+        // Re-lookup only for successful inserts
+        for (Map.Entry<TableBucket, MissingKeysContext> entry :
+                entriesPerBucketToInsert.entrySet()) {
+            TableBucket tb = entry.getKey();
+            if (failedBuckets.contains(tb)) {
+                continue; // Skip buckets that failed during insert
+            }
+            MissingKeysContext missingKeysContext = entry.getValue();
+            List<byte[]> results =
+                    
getReplicaOrException(tb).lookups(missingKeysContext.missingKeys);
+            LookupResultForBucket lookupResult = 
lookupResultForBucketMap.get(tb);
+            for (int i = 0; i < missingKeysContext.missingIndexes.size(); i++) 
{
+                lookupResult
+                        .lookupValues()
+                        .set(missingKeysContext.missingIndexes.get(i), 
results.get(i));
+            }
+        }
+
+        return lookupResultForBucketMap;
     }
 
     /**
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 145cd1f0f..df2ab926f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -246,18 +246,29 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
     public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
         Map<TableBucket, List<byte[]>> lookupData = toLookupData(request);
         Map<TableBucket, LookupResultForBucket> errorResponseMap = new 
HashMap<>();
-        Map<TableBucket, List<byte[]>> interesting =
-                authorizeRequestData(
-                        READ, lookupData, errorResponseMap, 
LookupResultForBucket::new);
-        if (interesting.isEmpty()) {
-            return 
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
-        }
-
         CompletableFuture<LookupResponse> response = new CompletableFuture<>();
-        replicaManager.lookups(
-                lookupData,
-                currentSession().getApiVersion(),
-                value -> response.complete(makeLookupResponse(value, 
errorResponseMap)));
+
+        if (request.hasInsertIfNotExists() && request.isInsertIfNotExists()) {
+            authorizeTable(WRITE, request.getTableId());
+            replicaManager.lookups(
+                    request.isInsertIfNotExists(),
+                    request.getTimeoutMs(),
+                    request.getAcks(),
+                    lookupData,
+                    currentSession().getApiVersion(),
+                    value -> response.complete(makeLookupResponse(value, 
errorResponseMap)));
+        } else {
+            Map<TableBucket, List<byte[]>> interesting =
+                    authorizeRequestData(
+                            READ, lookupData, errorResponseMap, 
LookupResultForBucket::new);
+            if (interesting.isEmpty()) {
+                return 
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
+            }
+            replicaManager.lookups(
+                    lookupData,
+                    currentSession().getApiVersion(),
+                    value -> response.complete(makeLookupResponse(value, 
errorResponseMap)));
+        }
         return response;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index fc8a4a054..8472964ab 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.fluss.exception.InvalidRequiredAcksException;
 import org.apache.fluss.exception.NotLeaderOrFollowerException;
 import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.SchemaGetter;
@@ -41,7 +42,10 @@ import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.rpc.entity.LimitScanResultForBucket;
@@ -92,8 +96,12 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2;
@@ -110,6 +118,10 @@ import static 
org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.record.TestData.DATA3_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK_AUTO_INC;
+import static org.apache.fluss.record.TestData.DATA3_TABLE_ID_PK_AUTO_INC;
+import static org.apache.fluss.record.TestData.DATA3_TABLE_PATH_PK_AUTO_INC;
 import static org.apache.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE;
 import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
 import static 
org.apache.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK;
@@ -793,6 +805,294 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 });
     }
 
+    @Test
+    void testLookupWithInsertIfNotExists() throws Exception {
+        TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1);
+        makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 
tb.getBucket());
+
+        CompactedKeyEncoder keyEncoder = new 
CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0});
+
+        // Scenario 1: All keys missing - should insert and return new values
+        byte[] key100 = keyEncoder.encodeKey(row(new Object[] {100}));
+        byte[] key200 = keyEncoder.encodeKey(row(new Object[] {200}));
+
+        List<byte[]> inserted = lookupWithInsert(tb, Arrays.asList(key100, 
key200)).lookupValues();
+        assertThat(inserted).hasSize(2).allMatch(Objects::nonNull);
+        verifyLookup(tb, key100, inserted.get(0));
+        verifyLookup(tb, key200, inserted.get(1));
+
+        // Verify that corresponding 2 log records were created
+        FetchLogResultForBucket logResult = fetchLog(tb, 0L);
+        assertThat(logResult.getHighWatermark()).isEqualTo(2L);
+        LogRecords records = logResult.records();
+        TestingSchemaGetter schemaGetter =
+                new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA1_SCHEMA_PK);
+        List<Object[]> expected = Arrays.asList(new Object[] {100, null}, new 
Object[] {200, null});
+        assertLogRecordsEquals(DATA1_ROW_TYPE, records, expected, 
ChangeType.INSERT, schemaGetter);
+
+        // Scenario 2: All keys exist - should return existing values without 
modification
+        List<byte[]> existing = lookupWithInsert(tb, Arrays.asList(key100, 
key200)).lookupValues();
+        assertThat(existing).containsExactlyElementsOf(inserted);
+
+        // Verify that no new log records were created
+        logResult = fetchLog(tb, 0L);
+        assertThat(logResult.getHighWatermark()).isEqualTo(2L);
+
+        // Scenario 3: Mixed - key100 exists, key300 missing
+        byte[] key300 = keyEncoder.encodeKey(row(new Object[] {300}));
+        List<byte[]> mixed = lookupWithInsert(tb, Arrays.asList(key100, 
key300)).lookupValues();
+        assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing
+        assertThat(mixed.get(1)).isNotNull(); // newly inserted
+        verifyLookup(tb, key300, mixed.get(1));
+
+        // Verify that only one new log record was created for key300
+        logResult = fetchLog(tb, 2L);
+        assertThat(logResult.getHighWatermark()).isEqualTo(3L);
+        records = logResult.records();
+        expected = Collections.singletonList(new Object[] {300, null});
+        assertLogRecordsEquals(DATA1_ROW_TYPE, records, expected, 
ChangeType.INSERT, schemaGetter);
+    }
+
+    @Test
+    void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
+        TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1);
+        makeKvTableAsLeader(
+                DATA3_TABLE_ID_PK_AUTO_INC, DATA3_TABLE_PATH_PK_AUTO_INC, 
tb.getBucket());
+
+        // Encode only the key field 'a' (index 0) - decoder returns only key 
fields, not full row
+        CompactedKeyEncoder keyEncoder = new 
CompactedKeyEncoder(DATA3_ROW_TYPE, new int[] {0});
+
+        // Lookup missing keys - should insert with auto-generated values for 
column 'c'
+        byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100}));
+        byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200}));
+
+        List<byte[]> inserted = lookupWithInsert(tb, Arrays.asList(key1, 
key2)).lookupValues();
+        assertThat(inserted).hasSize(2).allMatch(Objects::nonNull);
+
+        // Decode values to verify auto-increment column values
+        TestingSchemaGetter schemaGetter =
+                new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
DATA3_SCHEMA_PK_AUTO_INC);
+        ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, 
KvFormat.COMPACTED);
+
+        InternalRow row1 = valueDecoder.decodeValue(inserted.get(0)).row;
+        InternalRow row2 = valueDecoder.decodeValue(inserted.get(1)).row;
+
+        // Auto-increment values should be sequential
+        assertThat(row1.getLong(2)).isEqualTo(1L);
+        assertThat(row2.getLong(2)).isEqualTo(2L);
+
+        // Lookup existing keys - should return same values without 
modification
+        List<byte[]> existing = lookupWithInsert(tb, Arrays.asList(key1, 
key2)).lookupValues();
+        assertThat(existing).containsExactlyElementsOf(inserted);
+
+        // Mixed scenario - key1 exists, key3 missing
+        byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300}));
+        List<byte[]> mixed = lookupWithInsert(tb, Arrays.asList(key1, 
key3)).lookupValues();
+        assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing 
unchanged
+
+        InternalRow row3 = valueDecoder.decodeValue(mixed.get(1)).row;
+        assertThat(row3.getLong(2)).isEqualTo(3L); // continues sequence
+
+        FetchLogResultForBucket logResult = fetchLog(tb, 0L);
+        assertThat(logResult.getHighWatermark()).isEqualTo(3L);
+        LogRecords records = logResult.records();
+        List<Object[]> expected =
+                Arrays.asList(
+                        new Object[] {100, null, 1L},
+                        new Object[] {200, null, 2L},
+                        new Object[] {300, null, 3L});
+        TestingSchemaGetter schemaGetter2 =
+                new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
DATA3_SCHEMA_PK_AUTO_INC);
+        assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, 
ChangeType.INSERT, schemaGetter2);
+    }
+
+    // Fixme
+    @Test
+    void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws 
Exception {
+        TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1);
+        makeKvTableAsLeader(
+                DATA3_TABLE_ID_PK_AUTO_INC, DATA3_TABLE_PATH_PK_AUTO_INC, 
tb.getBucket());
+
+        CompactedKeyEncoder keyEncoder = new 
CompactedKeyEncoder(DATA3_ROW_TYPE, new int[] {0});
+
+        // Create keys that all threads will look up
+        byte[] key100 = keyEncoder.encodeKey(row(new Object[] {100}));
+        byte[] key200 = keyEncoder.encodeKey(row(new Object[] {200}));
+        byte[] key300 = keyEncoder.encodeKey(row(new Object[] {300}));
+        List<byte[]> keys = Arrays.asList(key100, key200, key300);
+
+        int numThreads = 3;
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(numThreads);
+
+        // Store results from each thread
+        @SuppressWarnings("unchecked")
+        List<byte[]>[] threadResults = new List[numThreads];
+
+        for (int i = 0; i < numThreads; i++) {
+            final int threadIndex = i;
+            executor.submit(
+                    () -> {
+                        try {
+                            // Wait for all threads to be ready
+                            startLatch.await();
+                            // Perform concurrent lookupWithInsert
+                            LookupResultForBucket result = 
lookupWithInsert(tb, keys);
+                            threadResults[threadIndex] = result.lookupValues();
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            doneLatch.countDown();
+                        }
+                    });
+        }
+
+        // Start all threads simultaneously
+        startLatch.countDown();
+        // Wait for all threads to complete
+        assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue();
+        executor.shutdown();
+
+        // Verify all threads received non-null values
+        for (int i = 0; i < numThreads; i++) {
+            assertThat(threadResults[i])
+                    .as("Thread %d results should not be null", i)
+                    .isNotNull()
+                    .hasSize(3)
+                    .allMatch(Objects::nonNull);
+        }
+
+        // Verify all threads received the same values for each key 
(consistency check)
+        for (int keyIndex = 0; keyIndex < 3; keyIndex++) {
+            byte[] referenceValue = threadResults[0].get(keyIndex);
+            for (int threadIndex = 1; threadIndex < numThreads; threadIndex++) 
{
+                assertThat(threadResults[threadIndex].get(keyIndex))
+                        .as(
+                                "Thread %d should have same value as thread 0 
for key index %d",
+                                threadIndex, keyIndex)
+                        .isEqualTo(referenceValue);
+            }
+        }
+
+        // Verify auto-increment values are sequential and unique
+        TestingSchemaGetter schemaGetter =
+                new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
DATA3_SCHEMA_PK_AUTO_INC);
+        ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, 
KvFormat.COMPACTED);
+
+        Set<Long> autoIncrementValues = new HashSet<>();
+        for (byte[] value : threadResults[0]) {
+            InternalRow row = valueDecoder.decodeValue(value).row;
+            autoIncrementValues.add(row.getLong(2));
+        }
+
+        // Should have exactly 3 unique auto-increment values
+        assertThat(autoIncrementValues).hasSize(3);
+
+        // Values should be 1, 2, 3 (in any order due to concurrency)
+        assertThat(autoIncrementValues).containsExactlyInAnyOrder(1L, 2L, 3L);
+
+        // Verify WAL: 3 INSERTs minimum, up to 15 if all concurrent updates 
execute
+        FetchLogResultForBucket logResult = fetchLog(tb, 0L);
+        assertThat(logResult.getHighWatermark()).isBetween(3L, 15L);
+
+        // Decode all WAL records
+        LogRecords records = logResult.records();
+        LogRecordReadContext readContext =
+                LogRecordReadContext.createArrowReadContext(
+                        DATA3_ROW_TYPE,
+                        DEFAULT_SCHEMA_ID,
+                        new TestingSchemaGetter(DEFAULT_SCHEMA_ID, 
DATA3_SCHEMA_PK_AUTO_INC));
+
+        List<Tuple2<ChangeType, Long>> recordsWithAutoInc = new ArrayList<>();
+        for (LogRecordBatch batch : records.batches()) {
+            try (CloseableIterator<LogRecord> iterator = 
batch.records(readContext)) {
+                while (iterator.hasNext()) {
+                    LogRecord record = iterator.next();
+                    recordsWithAutoInc.add(
+                            Tuple2.of(record.getChangeType(), 
record.getRow().getLong(2)));
+                }
+            }
+        }
+
+        // First 3 must be INSERTs with unique auto-increment values (1, 2, 3)
+        assertThat(recordsWithAutoInc).hasSizeGreaterThanOrEqualTo(3);
+        Set<Long> insertAutoIncs = new HashSet<>();
+        for (int i = 0; i < 3; i++) {
+            
assertThat(recordsWithAutoInc.get(i).f0).isEqualTo(ChangeType.INSERT);
+            insertAutoIncs.add(recordsWithAutoInc.get(i).f1);
+        }
+        assertThat(insertAutoIncs).containsExactlyInAnyOrder(1L, 2L, 3L);
+
+        // Remaining 12 must be UPDATE pairs (UPDATE_BEFORE + UPDATE_AFTER) 
preserving
+        // auto-increment
+        for (int i = 3; i < recordsWithAutoInc.size(); i++) {
+            Tuple2<ChangeType, Long> record = recordsWithAutoInc.get(i);
+            assertThat(record.f0).isIn(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER);
+            assertThat(record.f1).isIn(1L, 2L, 3L);
+        }
+    }
+
+    @Test
+    void testLookupWithInsertIfNotExistsMultiBucket() throws Exception {
+        TableBucket tb0 = new TableBucket(DATA1_TABLE_ID_PK, 0);
+        TableBucket tb1 = new TableBucket(DATA1_TABLE_ID_PK, 1);
+        makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 
tb0.getBucket());
+        makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 
tb1.getBucket());
+
+        CompactedKeyEncoder keyEncoder = new 
CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0});
+        byte[] key0 = keyEncoder.encodeKey(row(new Object[] {0}));
+        byte[] key1 = keyEncoder.encodeKey(row(new Object[] {1}));
+
+        verifyLookup(tb0, key0, null);
+        verifyLookup(tb1, key1, null);
+
+        Map<TableBucket, List<byte[]>> requestMap = new HashMap<>();
+        requestMap.put(tb0, Collections.singletonList(key0));
+        requestMap.put(tb1, Collections.singletonList(key1));
+
+        // Insert missing keys across buckets
+        CompletableFuture<Map<TableBucket, LookupResultForBucket>> future =
+                new CompletableFuture<>();
+        replicaManager.lookups(true, 20000, 1, requestMap, LOOKUP_KV_VERSION, 
future::complete);
+        Map<TableBucket, LookupResultForBucket> inserted = future.get(5, 
TimeUnit.SECONDS);
+
+        byte[] value0 = inserted.get(tb0).lookupValues().get(0);
+        byte[] value1 = inserted.get(tb1).lookupValues().get(0);
+
+        // Verify inserted values via lookup
+        verifyLookup(tb0, key0, value0);
+        verifyLookup(tb1, key1, value1);
+    }
+
+    private LookupResultForBucket lookupWithInsert(TableBucket tb, 
List<byte[]> keys)
+            throws Exception {
+        CompletableFuture<Map<TableBucket, LookupResultForBucket>> future =
+                new CompletableFuture<>();
+        replicaManager.lookups(
+                true,
+                20000,
+                1,
+                Collections.singletonMap(tb, keys),
+                LOOKUP_KV_VERSION,
+                future::complete);
+        LookupResultForBucket result = future.get(5, TimeUnit.SECONDS).get(tb);
+        assertThat(result.failed()).isFalse();
+        return result;
+    }
+
+    private FetchLogResultForBucket fetchLog(TableBucket tb, long fetchOffset) 
throws Exception {
+        CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future =
+                new CompletableFuture<>();
+        replicaManager.fetchLogRecords(
+                buildFetchParams(-1, Integer.MAX_VALUE),
+                Collections.singletonMap(
+                        tb, new FetchReqInfo(tb.getTableId(), fetchOffset, 
Integer.MAX_VALUE)),
+                null,
+                future::complete);
+        return future.get().get(tb);
+    }
+
     @Test
     void testPrefixLookup() throws Exception {
         TablePath tablePath = TablePath.of("test_db_1", 
"test_prefix_lookup_t1");
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 559353df6..3fb01a7e2 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -106,6 +106,10 @@ import static 
org.apache.fluss.record.TestData.DATA2_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK_AUTO_INC;
+import static 
org.apache.fluss.record.TestData.DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC;
+import static org.apache.fluss.record.TestData.DATA3_TABLE_ID_PK_AUTO_INC;
+import static org.apache.fluss.record.TestData.DATA3_TABLE_PATH_PK_AUTO_INC;
 import static 
org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
 import static 
org.apache.fluss.server.replica.ReplicaManager.HIGH_WATERMARK_CHECKPOINT_FILE_NAME;
 import static 
org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH;
@@ -266,6 +270,12 @@ public class ReplicaTestBase {
                 DATA2_TABLE_PATH,
                 TableRegistration.newTable(DATA2_TABLE_ID, 
DATA2_TABLE_DESCRIPTOR));
         zkClient.registerFirstSchema(DATA2_TABLE_PATH, DATA2_SCHEMA);
+
+        zkClient.registerTable(
+                DATA3_TABLE_PATH_PK_AUTO_INC,
+                TableRegistration.newTable(
+                        DATA3_TABLE_ID_PK_AUTO_INC, 
DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC));
+        zkClient.registerFirstSchema(DATA3_TABLE_PATH_PK_AUTO_INC, 
DATA3_SCHEMA_PK_AUTO_INC);
     }
 
     protected long registerTableInZkClient(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 5a28f9e2b..1137e499a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.tablet;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidRequiredAcksException;
+import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
@@ -30,7 +31,9 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.DefaultKvRecordBatch;
 import org.apache.fluss.record.DefaultValueRecordBatch;
 import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.FetchLogResponse;
@@ -973,4 +976,100 @@ public class TabletServiceITCase {
         return ServerRpcMessageUtils.makeNotifyLeaderAndIsrRequest(
                 0, Collections.singletonList(reqForBucket));
     }
+
+    @Test
+    void testLookupWithInsertIfNotExists() throws Exception {
+        long tableId =
+                createTable(
+                        FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, 
DATA1_TABLE_DESCRIPTOR_PK);
+        TableBucket tb = new TableBucket(tableId, 0);
+        FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+
+        int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+        TabletServerGateway gateway = 
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
+        CompactedKeyEncoder keyEncoder = new 
CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0});
+
+        byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100}));
+        byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200}));
+        byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300}));
+
+        // Insert missing keys
+        PbLookupRespForBucket resp1 =
+                gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, 
key1, key2))
+                        .get()
+                        .getBucketsRespAt(0);
+        assertThat(resp1.hasErrorCode()).isFalse();
+        byte[] value1 = resp1.getValuesList().get(0).getValues();
+        byte[] value2 = resp1.getValuesList().get(1).getValues();
+        assertThat(value1).isNotNull();
+        assertThat(value2).isNotNull();
+
+        // Existing keys return same values
+        PbLookupRespForBucket resp2 =
+                gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, 
key1, key2))
+                        .get()
+                        .getBucketsRespAt(0);
+        assertThat(resp2.getValuesList().get(0).getValues()).isEqualTo(value1);
+        assertThat(resp2.getValuesList().get(1).getValues()).isEqualTo(value2);
+
+        // Mixed: key1 exists, key3 missing
+        PbLookupRespForBucket resp3 =
+                gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, 
key1, key3))
+                        .get()
+                        .getBucketsRespAt(0);
+        assertThat(resp3.getValuesList().get(0).getValues()).isEqualTo(value1);
+        assertThat(resp3.getValuesList().get(1).getValues()).isNotNull();
+    }
+
+    @Test
+    void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_auto_increment_t1");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.BIGINT())
+                        .withComment("auto increment column")
+                        .column("c", DataTypes.STRING())
+                        .enableAutoIncrement("b")
+                        .primaryKey("a")
+                        .build();
+        RowType rowType = schema.getRowType();
+        TableDescriptor descriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"a").build();
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
descriptor);
+        TableBucket tb = new TableBucket(tableId, 0);
+        FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+
+        int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+        TabletServerGateway gateway = 
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
+        CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new 
int[] {0});
+        TestingSchemaGetter schemaGetter = new 
TestingSchemaGetter(DEFAULT_SCHEMA_ID, schema);
+        ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, 
KvFormat.COMPACTED);
+
+        byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100}));
+        byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200}));
+        byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300}));
+
+        // Insert key1, key2 - auto-increment: 1, 2
+        PbLookupRespForBucket resp1 =
+                gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, 
key1, key2))
+                        .get()
+                        .getBucketsRespAt(0);
+        assertThat(resp1.hasErrorCode()).isFalse();
+        InternalRow row1 = 
valueDecoder.decodeValue(resp1.getValuesList().get(0).getValues()).row;
+        InternalRow row2 = 
valueDecoder.decodeValue(resp1.getValuesList().get(1).getValues()).row;
+        assertThat(row1.getLong(1)).isEqualTo(1L);
+        assertThat(row2.getLong(1)).isEqualTo(2L);
+
+        // Mixed: key1 exists (unchanged), key3 missing (gets 3)
+        PbLookupRespForBucket resp2 =
+                gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, 
key1, key3))
+                        .get()
+                        .getBucketsRespAt(0);
+        InternalRow existingRow =
+                
valueDecoder.decodeValue(resp2.getValuesList().get(0).getValues()).row;
+        InternalRow newRow = 
valueDecoder.decodeValue(resp2.getValuesList().get(1).getValues()).row;
+        assertThat(existingRow.getLong(1)).isEqualTo(1L);
+        assertThat(newRow.getLong(1)).isEqualTo(3L);
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index 5055105e9..41c4ee967 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -284,6 +284,27 @@ public class RpcMessageTestUtils {
         return lookupRequest;
     }
 
+    public static LookupRequest newLookupRequest(
+            long tableId,
+            int bucketId,
+            boolean insertIfNotExists,
+            int timeoutMs,
+            int acks,
+            byte[]... keys) {
+        LookupRequest lookupRequest =
+                new LookupRequest()
+                        .setTableId(tableId)
+                        .setInsertIfNotExists(insertIfNotExists)
+                        .setTimeoutMs(timeoutMs)
+                        .setAcks(acks);
+        PbLookupReqForBucket pbLookupReqForBucket = 
lookupRequest.addBucketsReq();
+        pbLookupReqForBucket.setBucketId(bucketId);
+        for (byte[] key : keys) {
+            pbLookupReqForBucket.addKey(key);
+        }
+        return lookupRequest;
+    }
+
     public static PrefixLookupRequest newPrefixLookupRequest(
             long tableId, int bucketId, List<byte[]> prefixKeys) {
         PrefixLookupRequest prefixLookupRequest = new 
PrefixLookupRequest().setTableId(tableId);
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 56678e7e8..443359c32 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -337,6 +337,7 @@
                                         <exclude>
                                             
org.apache.fluss.row.columnar.BytesColumnVector.Bytes
                                         </exclude>
+                                        
<exclude>org.apache.fluss.row.decode.KeyDecoder</exclude>
                                         
<exclude>org.apache.fluss.row.encode.RowEncoder</exclude>
                                         
<exclude>org.apache.fluss.row.encode.KeyEncoder</exclude>
                                         
<exclude>org.apache.fluss.table.*</exclude>

Reply via email to