This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e848975c [kv] Add OVERWRITE MergeMode support for undo recovery in 
aggregation tables (#2492)
1e848975c is described below

commit 1e848975c1a905013150a87902ba44cbe149c64b
Author: Yang Wang <[email protected]>
AuthorDate: Fri Jan 30 19:04:00 2026 +0800

    [kv] Add OVERWRITE MergeMode support for undo recovery in aggregation 
tables (#2492)
---
 .../fluss/client/table/writer/TableUpsert.java     |  42 +-
 .../apache/fluss/client/table/writer/Upsert.java   |  46 +-
 .../client/table/writer/UpsertWriterImpl.java      |  21 +-
 .../fluss/client/utils/ClientRpcMessageUtils.java  |  21 +-
 .../apache/fluss/client/write/KvWriteBatch.java    |  19 +
 .../fluss/client/write/RecordAccumulator.java      |   1 +
 .../org/apache/fluss/client/write/WriteRecord.java |  79 ++-
 .../client/utils/ClientRpcMessageUtilsTest.java    | 144 ++++++
 .../fluss/client/write/KvWriteBatchTest.java       |  84 ++++
 .../org/apache/fluss/rpc/protocol/MergeMode.java   | 119 +++++
 fluss-rpc/src/main/proto/FlussApi.proto            |   8 +
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  38 +-
 .../org/apache/fluss/server/replica/Replica.java   |   8 +-
 .../fluss/server/replica/ReplicaManager.java       |   8 +-
 .../apache/fluss/server/tablet/TabletService.java  |   7 +
 .../fluss/server/kv/KvTabletMergeModeTest.java     | 557 +++++++++++++++++++++
 .../fluss/server/replica/ReplicaManagerTest.java   |  25 +-
 .../apache/fluss/server/replica/ReplicaTest.java   |   5 +-
 18 files changed, 1203 insertions(+), 29 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
index fe865eac4..ad2d0990d 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
@@ -18,12 +18,16 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.client.write.WriterClient;
+import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /** API for configuring and creating {@link UpsertWriter}. */
@@ -32,22 +36,24 @@ public class TableUpsert implements Upsert {
     private final TablePath tablePath;
     private final TableInfo tableInfo;
     private final WriterClient writerClient;
-
     private final @Nullable int[] targetColumns;
+    private final MergeMode mergeMode;
 
     public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient 
writerClient) {
-        this(tablePath, tableInfo, writerClient, null);
+        this(tablePath, tableInfo, writerClient, null, MergeMode.DEFAULT);
     }
 
     private TableUpsert(
             TablePath tablePath,
             TableInfo tableInfo,
             WriterClient writerClient,
-            @Nullable int[] targetColumns) {
+            @Nullable int[] targetColumns,
+            MergeMode mergeMode) {
         this.tablePath = tablePath;
         this.tableInfo = tableInfo;
         this.writerClient = writerClient;
         this.targetColumns = targetColumns;
+        this.mergeMode = mergeMode;
     }
 
     @Override
@@ -68,7 +74,7 @@ public class TableUpsert implements Upsert {
                 }
             }
         }
-        return new TableUpsert(tablePath, tableInfo, writerClient, 
targetColumns);
+        return new TableUpsert(tablePath, tableInfo, writerClient, 
targetColumns, this.mergeMode);
     }
 
     @Override
@@ -91,13 +97,39 @@ public class TableUpsert implements Upsert {
         return partialUpdate(targetColumns);
     }
 
+    @Override
+    public Upsert mergeMode(MergeMode mode) {
+        checkNotNull(mode, "merge mode");
+        return new TableUpsert(tablePath, tableInfo, writerClient, 
this.targetColumns, mode);
+    }
+
     @Override
     public UpsertWriter createWriter() {
-        return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, 
writerClient);
+        // Check that OVERWRITE mode is only used with tables that have a 
merge engine
+        if (mergeMode == MergeMode.OVERWRITE) {
+            Optional<MergeEngineType> mergeEngineType =
+                    tableInfo.getTableConfig().getMergeEngineType();
+            if (!mergeEngineType.isPresent()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "MergeMode %s is only supported for tables 
with a merge engine, "
+                                        + "but table %s does not have a merge 
engine configured.",
+                                mergeMode, tablePath));
+            }
+        }
+        return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, 
writerClient, mergeMode);
     }
 
     @Override
     public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
+        // TypedUpsertWriterImpl doesn't support mergeMode yet
+        if (mergeMode != MergeMode.DEFAULT) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "TypedUpsertWriter does not support MergeMode %s 
yet. "
+                                    + "Please use createWriter() instead.",
+                            mergeMode));
+        }
         return new TypedUpsertWriterImpl<>(createWriter(), pojoClass, 
tableInfo, targetColumns);
     }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
index 0843437fe..e8c859272 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.rpc.protocol.MergeMode;
 
 import javax.annotation.Nullable;
 
@@ -26,7 +27,8 @@ import javax.annotation.Nullable;
  * Table.
  *
  * <p>{@link Upsert} objects are immutable and can be shared between threads. 
Refinement methods,
- * like {@link #partialUpdate}, create new Upsert instances.
+ * like {@link #partialUpdate(int[])} and {@link #mergeMode(MergeMode)}, 
create new Upsert
+ * instances.
  *
  * @since 0.6
  */
@@ -56,9 +58,49 @@ public interface Upsert {
      */
     Upsert partialUpdate(String... targetColumnNames);
 
+    /**
+     * Specify merge mode for the UpsertWriter and returns a new Upsert 
instance.
+     *
+     * <p>This method controls how the created UpsertWriter handles data 
merging for tables with
+     * merge engines:
+     *
+     * <ul>
+     *   <li>{@link MergeMode#DEFAULT} (default): Data is merged through the 
server-side merge
+     *       engine. The behavior depends on the configured merge engine type:
+     *       <ul>
+     *         <li>For aggregation merge engine: applies aggregation functions 
(e.g., SUM, MAX,
+     *             MIN).
+     *         <li>For first-row merge engine: retains the first observed row.
+     *         <li>For versioned merge engine: keeps the row with the highest 
version.
+     *       </ul>
+     *   <li>{@link MergeMode#OVERWRITE}: Data directly overwrites values, 
bypassing the merge
+     *       engine. This is useful for undo/recovery operations to restore 
exact historical values.
+     * </ul>
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * // Default merge mode
+     * UpsertWriter normalWriter = table.newUpsert()
+     *     .mergeMode(MergeMode.DEFAULT)
+     *     .createWriter();
+     *
+     * // Overwrite mode for undo recovery
+     * UpsertWriter undoWriter = table.newUpsert()
+     *     .mergeMode(MergeMode.OVERWRITE)
+     *     .createWriter();
+     * }</pre>
+     *
+     * @param mode the merge mode
+     * @return a new Upsert instance with the specified merge mode
+     * @since 0.9
+     */
+    Upsert mergeMode(MergeMode mode);
+
     /**
      * Create a new {@link UpsertWriter} using {@code InternalRow} with the 
optional {@link
-     * #partialUpdate(String...)} information to upsert and delete data to a 
Primary Key Table.
+     * #partialUpdate(String...)} and {@link #mergeMode(MergeMode)} 
information to upsert and delete
+     * data to a Primary Key Table.
      */
     UpsertWriter createWriter();
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 9a612e090..d2f52d5ca 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -30,6 +30,7 @@ import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.row.encode.KeyEncoder;
 import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
@@ -55,11 +56,23 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
     private final FieldGetter[] fieldGetters;
     private final TableInfo tableInfo;
 
+    /** The merge mode for this writer. This controls how the server handles 
data merging. */
+    private final MergeMode mergeMode;
+
     UpsertWriterImpl(
             TablePath tablePath,
             TableInfo tableInfo,
             @Nullable int[] partialUpdateColumns,
             WriterClient writerClient) {
+        this(tablePath, tableInfo, partialUpdateColumns, writerClient, 
MergeMode.DEFAULT);
+    }
+
+    UpsertWriterImpl(
+            TablePath tablePath,
+            TableInfo tableInfo,
+            @Nullable int[] partialUpdateColumns,
+            WriterClient writerClient,
+            MergeMode mergeMode) {
         super(tablePath, tableInfo, writerClient);
         RowType rowType = tableInfo.getRowType();
         sanityCheck(
@@ -88,7 +101,9 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
         this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
         this.rowEncoder = RowEncoder.create(kvFormat, rowType);
         this.fieldGetters = InternalRow.createFieldGetters(rowType);
+
         this.tableInfo = tableInfo;
+        this.mergeMode = mergeMode;
     }
 
     private static void sanityCheck(
@@ -173,7 +188,8 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                         key,
                         bucketKey,
                         writeFormat,
-                        targetColumns);
+                        targetColumns,
+                        mergeMode);
         return send(record).thenApply(ignored -> UPSERT_SUCCESS);
     }
 
@@ -196,7 +212,8 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                         key,
                         bucketKey,
                         writeFormat,
-                        targetColumns);
+                        targetColumns,
+                        mergeMode);
         return send(record).thenApply(ignored -> DELETE_SUCCESS);
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index f6b37f5d1..68d8712f1 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -79,6 +79,7 @@ import org.apache.fluss.rpc.messages.PrefixLookupRequest;
 import org.apache.fluss.rpc.messages.ProduceLogRequest;
 import org.apache.fluss.rpc.messages.PutKvRequest;
 import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.utils.json.DataTypeJsonSerde;
 import org.apache.fluss.utils.json.JsonSerdeUtils;
 
@@ -138,11 +139,12 @@ public class ClientRpcMessageUtils {
                         .setTimeoutMs(maxRequestTimeoutMs);
         // check the target columns in the batch list should be the same. If 
not same,
         // we throw exception directly currently.
-        int[] targetColumns =
-                ((KvWriteBatch) 
readyWriteBatches.get(0).writeBatch()).getTargetColumns();
+        KvWriteBatch firstBatch = (KvWriteBatch) 
readyWriteBatches.get(0).writeBatch();
+        int[] targetColumns = firstBatch.getTargetColumns();
+        MergeMode mergeMode = firstBatch.getMergeMode();
         for (int i = 1; i < readyWriteBatches.size(); i++) {
-            int[] currentBatchTargetColumns =
-                    ((KvWriteBatch) 
readyWriteBatches.get(i).writeBatch()).getTargetColumns();
+            KvWriteBatch currentBatch = (KvWriteBatch) 
readyWriteBatches.get(i).writeBatch();
+            int[] currentBatchTargetColumns = currentBatch.getTargetColumns();
             if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) {
                 throw new IllegalStateException(
                         String.format(
@@ -151,10 +153,21 @@ public class ClientRpcMessageUtils {
                                 Arrays.toString(targetColumns),
                                 Arrays.toString(currentBatchTargetColumns)));
             }
+            // Validate mergeMode consistency across batches
+            if (currentBatch.getMergeMode() != mergeMode) {
+                throw new IllegalStateException(
+                        String.format(
+                                "All the write batches to make put kv request 
should have the same mergeMode, "
+                                        + "but got %s and %s.",
+                                mergeMode, currentBatch.getMergeMode()));
+            }
         }
         if (targetColumns != null) {
             request.setTargetColumns(targetColumns);
         }
+        // Set mergeMode in the request - this is the proper way to pass 
mergeMode to server
+        request.setAggMode(mergeMode.getProtoValue());
+
         readyWriteBatches.forEach(
                 readyBatch -> {
                     TableBucket tableBucket = readyBatch.tableBucket();
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
index 72f596be9..a72d36717 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
@@ -28,6 +28,7 @@ import org.apache.fluss.record.bytesview.BytesView;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch {
     private final KvRecordBatchBuilder recordsBuilder;
     private final @Nullable int[] targetColumns;
     private final int schemaId;
+    private final MergeMode mergeMode;
 
     public KvWriteBatch(
             int bucketId,
@@ -60,6 +62,7 @@ public class KvWriteBatch extends WriteBatch {
             int writeLimit,
             AbstractPagedOutputView outputView,
             @Nullable int[] targetColumns,
+            MergeMode mergeMode,
             long createdMs) {
         super(bucketId, physicalTablePath, createdMs);
         this.outputView = outputView;
@@ -67,6 +70,7 @@ public class KvWriteBatch extends WriteBatch {
                 KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, 
kvFormat);
         this.targetColumns = targetColumns;
         this.schemaId = schemaId;
+        this.mergeMode = mergeMode;
     }
 
     @Override
@@ -94,6 +98,16 @@ public class KvWriteBatch extends WriteBatch {
                             Arrays.toString(targetColumns)));
         }
 
+        // Validate mergeMode consistency - records with different mergeMode 
cannot be batched
+        // together
+        if (writeRecord.getMergeMode() != this.mergeMode) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mix records with different mergeMode in 
the same batch. "
+                                    + "Batch mergeMode: %s, Record mergeMode: 
%s",
+                            this.mergeMode, writeRecord.getMergeMode()));
+        }
+
         byte[] key = writeRecord.getKey();
         checkNotNull(key, "key must be not null for kv record");
         checkNotNull(callback, "write callback must be not null");
@@ -113,6 +127,10 @@ public class KvWriteBatch extends WriteBatch {
         return targetColumns;
     }
 
+    public MergeMode getMergeMode() {
+        return mergeMode;
+    }
+
     @Override
     public BytesView build() {
         try {
@@ -163,6 +181,7 @@ public class KvWriteBatch extends WriteBatch {
         recordsBuilder.abort();
     }
 
+    @Override
     public void resetWriterState(long writerId, int batchSequence) {
         super.resetWriterState(writerId, batchSequence);
         recordsBuilder.resetWriterState(writerId, batchSequence);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 0c834a49f..a1f0544a5 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -617,6 +617,7 @@ public final class RecordAccumulator {
                         outputView.getPreAllocatedSize(),
                         outputView,
                         writeRecord.getTargetColumns(),
+                        writeRecord.getMergeMode(),
                         clock.milliseconds());
 
             case ARROW_LOG:
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 9265c5f77..f25562be7 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -27,6 +27,7 @@ import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.rpc.protocol.MergeMode;
 
 import javax.annotation.Nullable;
 
@@ -52,6 +53,27 @@ public final class WriteRecord {
             byte[] bucketKey,
             WriteFormat writeFormat,
             @Nullable int[] targetColumns) {
+        return forUpsert(
+                tableInfo,
+                tablePath,
+                row,
+                key,
+                bucketKey,
+                writeFormat,
+                targetColumns,
+                MergeMode.DEFAULT);
+    }
+
+    /** Create a write record for upsert operation with merge mode control. */
+    public static WriteRecord forUpsert(
+            TableInfo tableInfo,
+            PhysicalTablePath tablePath,
+            BinaryRow row,
+            byte[] key,
+            byte[] bucketKey,
+            WriteFormat writeFormat,
+            @Nullable int[] targetColumns,
+            MergeMode mergeMode) {
         checkNotNull(row, "row must not be null");
         checkNotNull(key, "key must not be null");
         checkNotNull(bucketKey, "bucketKey must not be null");
@@ -65,7 +87,8 @@ public final class WriteRecord {
                 row,
                 writeFormat,
                 targetColumns,
-                estimatedSizeInBytes);
+                estimatedSizeInBytes,
+                mergeMode);
     }
 
     /** Create a write record for delete operation and partial-delete update. 
*/
@@ -76,6 +99,25 @@ public final class WriteRecord {
             byte[] bucketKey,
             WriteFormat writeFormat,
             @Nullable int[] targetColumns) {
+        return forDelete(
+                tableInfo,
+                tablePath,
+                key,
+                bucketKey,
+                writeFormat,
+                targetColumns,
+                MergeMode.DEFAULT);
+    }
+
+    /** Create a write record for delete operation with merge mode control. */
+    public static WriteRecord forDelete(
+            TableInfo tableInfo,
+            PhysicalTablePath tablePath,
+            byte[] key,
+            byte[] bucketKey,
+            WriteFormat writeFormat,
+            @Nullable int[] targetColumns,
+            MergeMode mergeMode) {
         checkNotNull(key, "key must not be null");
         checkNotNull(bucketKey, "key must not be null");
         checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
@@ -88,7 +130,8 @@ public final class WriteRecord {
                 null,
                 writeFormat,
                 targetColumns,
-                estimatedSizeInBytes);
+                estimatedSizeInBytes,
+                mergeMode);
     }
 
     /** Create a write record for append operation for indexed format. */
@@ -108,7 +151,8 @@ public final class WriteRecord {
                 row,
                 WriteFormat.INDEXED_LOG,
                 null,
-                estimatedSizeInBytes);
+                estimatedSizeInBytes,
+                MergeMode.DEFAULT);
     }
 
     /** Creates a write record for append operation for Arrow format. */
@@ -129,7 +173,8 @@ public final class WriteRecord {
                 row,
                 WriteFormat.ARROW_LOG,
                 null,
-                estimatedSizeInBytes);
+                estimatedSizeInBytes,
+                MergeMode.DEFAULT);
     }
 
     /** Creates a write record for append operation for Compacted format. */
@@ -149,7 +194,8 @@ public final class WriteRecord {
                 row,
                 WriteFormat.COMPACTED_LOG,
                 null,
-                estimatedSizeInBytes);
+                estimatedSizeInBytes,
+                MergeMode.DEFAULT);
     }
 
     // 
------------------------------------------------------------------------------------------
@@ -166,6 +212,16 @@ public final class WriteRecord {
     private final int estimatedSizeInBytes;
     private final TableInfo tableInfo;
 
+    /**
+     * The merge mode for this record. This controls how the server handles 
data merging.
+     *
+     * <ul>
+     *   <li>DEFAULT: Normal merge through server-side merge engine
+     *   <li>OVERWRITE: Bypass merge engine, directly replace values (for undo 
recovery)
+     * </ul>
+     */
+    private final MergeMode mergeMode;
+
     private WriteRecord(
             TableInfo tableInfo,
             PhysicalTablePath physicalTablePath,
@@ -174,7 +230,8 @@ public final class WriteRecord {
             @Nullable InternalRow row,
             WriteFormat writeFormat,
             @Nullable int[] targetColumns,
-            int estimatedSizeInBytes) {
+            int estimatedSizeInBytes,
+            MergeMode mergeMode) {
         this.tableInfo = tableInfo;
         this.physicalTablePath = physicalTablePath;
         this.key = key;
@@ -183,6 +240,7 @@ public final class WriteRecord {
         this.writeFormat = writeFormat;
         this.targetColumns = targetColumns;
         this.estimatedSizeInBytes = estimatedSizeInBytes;
+        this.mergeMode = mergeMode;
     }
 
     public PhysicalTablePath getPhysicalTablePath() {
@@ -214,6 +272,15 @@ public final class WriteRecord {
         return writeFormat;
     }
 
+    /**
+     * Get the merge mode for this record.
+     *
+     * @return the merge mode
+     */
+    public MergeMode getMergeMode() {
+        return mergeMode;
+    }
+
     /**
      * Get the estimated size in bytes of the record with batch header.
      *
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
new file mode 100644
index 000000000..5b1ca58c7
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.client.write.KvWriteBatch;
+import org.apache.fluss.client.write.ReadyWriteBatch;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.memory.PreAllocatedPagedOutputView;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link ClientRpcMessageUtils}.
+ *
+ * <p>Focuses on MergeMode consistency validation in makePutKvRequest.
+ */
+class ClientRpcMessageUtilsTest {
+
+    private static final long TABLE_ID = DATA1_TABLE_ID_PK;
+    private static final int ACKS = 1;
+    private static final int TIMEOUT_MS = 30000;
+
+    @Test
+    void testMakePutKvRequestWithConsistentMergeMode() throws Exception {
+        // Create two batches with same mergeMode (DEFAULT)
+        KvWriteBatch batch1 = createKvWriteBatch(0, MergeMode.DEFAULT);
+        KvWriteBatch batch2 = createKvWriteBatch(1, MergeMode.DEFAULT);
+
+        List<ReadyWriteBatch> readyBatches =
+                Arrays.asList(
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), 
batch1),
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), 
batch2));
+
+        // Should succeed without exception
+        PutKvRequest request =
+                ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, 
TIMEOUT_MS, readyBatches);
+
+        // Verify mergeMode is set correctly in request
+        assertThat(request.hasAggMode()).isTrue();
+        
assertThat(request.getAggMode()).isEqualTo(MergeMode.DEFAULT.getProtoValue());
+    }
+
+    @Test
+    void testMakePutKvRequestWithOverwriteMode() throws Exception {
+        // Create batches with OVERWRITE mode
+        KvWriteBatch batch1 = createKvWriteBatch(0, MergeMode.OVERWRITE);
+        KvWriteBatch batch2 = createKvWriteBatch(1, MergeMode.OVERWRITE);
+
+        List<ReadyWriteBatch> readyBatches =
+                Arrays.asList(
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), 
batch1),
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), 
batch2));
+
+        PutKvRequest request =
+                ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, 
TIMEOUT_MS, readyBatches);
+
+        // Verify OVERWRITE mode is set correctly
+        assertThat(request.hasAggMode()).isTrue();
+        
assertThat(request.getAggMode()).isEqualTo(MergeMode.OVERWRITE.getProtoValue());
+    }
+
+    @Test
+    void testMakePutKvRequestWithInconsistentMergeMode() throws Exception {
+        // Create batches with different mergeModes
+        KvWriteBatch defaultBatch = createKvWriteBatch(0, MergeMode.DEFAULT);
+        KvWriteBatch overwriteBatch = createKvWriteBatch(1, 
MergeMode.OVERWRITE);
+
+        List<ReadyWriteBatch> readyBatches =
+                Arrays.asList(
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), 
defaultBatch),
+                        new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), 
overwriteBatch));
+
+        // Should throw exception due to inconsistent mergeMode
+        assertThatThrownBy(
+                        () ->
+                                ClientRpcMessageUtils.makePutKvRequest(
+                                        TABLE_ID, ACKS, TIMEOUT_MS, 
readyBatches))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "All the write batches to make put kv request should 
have the same mergeMode")
+                .hasMessageContaining("DEFAULT")
+                .hasMessageContaining("OVERWRITE");
+    }
+
+    @Test
+    void testMakePutKvRequestWithSingleBatch() throws Exception {
+        // Single batch should always succeed
+        KvWriteBatch batch = createKvWriteBatch(0, MergeMode.OVERWRITE);
+
+        List<ReadyWriteBatch> readyBatches =
+                Collections.singletonList(new ReadyWriteBatch(new 
TableBucket(TABLE_ID, 0), batch));
+
+        PutKvRequest request =
+                ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, 
TIMEOUT_MS, readyBatches);
+
+        
assertThat(request.getAggMode()).isEqualTo(MergeMode.OVERWRITE.getProtoValue());
+    }
+
+    private KvWriteBatch createKvWriteBatch(int bucketId, MergeMode mergeMode) 
throws Exception {
+        MemorySegment segment = MemorySegment.allocateHeapMemory(1024);
+        PreAllocatedPagedOutputView outputView =
+                new 
PreAllocatedPagedOutputView(Collections.singletonList(segment));
+        return new KvWriteBatch(
+                bucketId,
+                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                DATA1_TABLE_INFO_PK.getSchemaId(),
+                KvFormat.COMPACTED,
+                Integer.MAX_VALUE,
+                outputView,
+                null,
+                mergeMode,
+                System.currentTimeMillis());
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index c62c7b029..3fe428388 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -33,6 +33,7 @@ import org.apache.fluss.record.KvRecordReadContext;
 import org.apache.fluss.record.TestingSchemaGetter;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.types.DataType;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -227,6 +228,7 @@ class KvWriteBatchTest {
                 writeLimit,
                 outputView,
                 null,
+                MergeMode.DEFAULT,
                 System.currentTimeMillis());
     }
 
@@ -254,4 +256,86 @@ class KvWriteBatchTest {
         assertThat(toArray(kvRecord.getKey())).isEqualTo(key);
         assertThat(kvRecord.getRow()).isEqualTo(row);
     }
+
+    // ==================== MergeMode Tests ====================
+
+    @Test
+    void testMergeModeConsistencyValidation() throws Exception {
+        // Create batch with DEFAULT mode
+        KvWriteBatch defaultBatch =
+                createKvWriteBatchWithMergeMode(
+                        new TableBucket(DATA1_TABLE_ID_PK, 0), 
MergeMode.DEFAULT);
+
+        // Append record with DEFAULT mode should succeed
+        WriteRecord defaultRecord = 
createWriteRecordWithMergeMode(MergeMode.DEFAULT);
+        assertThat(defaultBatch.tryAppend(defaultRecord, 
newWriteCallback())).isTrue();
+
+        // Append record with OVERWRITE mode should fail
+        WriteRecord overwriteRecord = 
createWriteRecordWithMergeMode(MergeMode.OVERWRITE);
+        assertThatThrownBy(() -> defaultBatch.tryAppend(overwriteRecord, 
newWriteCallback()))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot mix records with different mergeMode in the 
same batch")
+                .hasMessageContaining("Batch mergeMode: DEFAULT")
+                .hasMessageContaining("Record mergeMode: OVERWRITE");
+    }
+
+    @Test
+    void testOverwriteModeBatch() throws Exception {
+        // Create batch with OVERWRITE mode
+        KvWriteBatch overwriteBatch =
+                createKvWriteBatchWithMergeMode(
+                        new TableBucket(DATA1_TABLE_ID_PK, 0), 
MergeMode.OVERWRITE);
+
+        // Verify batch has correct mergeMode
+        
assertThat(overwriteBatch.getMergeMode()).isEqualTo(MergeMode.OVERWRITE);
+
+        // Append record with OVERWRITE mode should succeed
+        WriteRecord overwriteRecord = 
createWriteRecordWithMergeMode(MergeMode.OVERWRITE);
+        assertThat(overwriteBatch.tryAppend(overwriteRecord, 
newWriteCallback())).isTrue();
+
+        // Append record with DEFAULT mode should fail
+        WriteRecord defaultRecord = 
createWriteRecordWithMergeMode(MergeMode.DEFAULT);
+        assertThatThrownBy(() -> overwriteBatch.tryAppend(defaultRecord, 
newWriteCallback()))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot mix records with different mergeMode in the 
same batch")
+                .hasMessageContaining("Batch mergeMode: OVERWRITE")
+                .hasMessageContaining("Record mergeMode: DEFAULT");
+    }
+
+    @Test
+    void testDefaultMergeModeIsDefault() throws Exception {
+        KvWriteBatch batch = createKvWriteBatch(new 
TableBucket(DATA1_TABLE_ID_PK, 0));
+        assertThat(batch.getMergeMode()).isEqualTo(MergeMode.DEFAULT);
+    }
+
+    private KvWriteBatch createKvWriteBatchWithMergeMode(TableBucket tb, 
MergeMode mergeMode)
+            throws Exception {
+        PreAllocatedPagedOutputView outputView =
+                new PreAllocatedPagedOutputView(
+                        Collections.singletonList(memoryPool.nextSegment()));
+        return new KvWriteBatch(
+                tb.getBucket(),
+                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                DATA1_TABLE_INFO_PK.getSchemaId(),
+                KvFormat.COMPACTED,
+                Integer.MAX_VALUE,
+                outputView,
+                null,
+                mergeMode,
+                System.currentTimeMillis());
+    }
+
+    private WriteRecord createWriteRecordWithMergeMode(MergeMode mergeMode) {
+        return WriteRecord.forUpsert(
+                DATA1_TABLE_INFO_PK,
+                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                row,
+                key,
+                key,
+                WriteFormat.COMPACTED_KV,
+                null,
+                mergeMode);
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java 
b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java
new file mode 100644
index 000000000..eb3cafbad
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.protocol;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Merge mode for write operations on tables with merge engines.
+ *
+ * <p>This enum controls how the server handles data merging when writing to 
tables with merge
+ * engines. It applies uniformly to all merge engine types (aggregation, 
first-row, versioned).
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public enum MergeMode {
+
+    /**
+     * Default merge mode: Data is merged through the server-side merge engine.
+     *
+     * <p>This is the normal mode for tables with merge engines. The behavior 
depends on the
+     * configured merge engine type:
+     *
+     * <ul>
+     *   <li>For aggregation merge engine: applies configured aggregation 
functions (e.g., SUM, MAX,
+     *       MIN) to merge new values with existing values.
+     *   <li>For first-row merge engine: retains the first observed row for 
each primary key.
+     *   <li>For versioned merge engine: keeps the row with the highest 
version for each primary
+     *       key.
+     * </ul>
+     */
+    DEFAULT(0),
+
+    /**
+     * Overwrite mode: Data directly overwrites target values, bypassing the 
merge engine.
+     *
+     * <p>This mode is used for undo/recovery operations to restore exact 
historical values. When in
+     * overwrite mode, the server will not apply any merge logic and will 
directly replace the
+     * existing values with the new values.
+     *
+     * <p>This is similar to an "OVERWRITE INTO" statement and is typically 
used internally by the
+     * Flink connector during failover recovery to restore the state to a 
previous checkpoint.
+     *
+     * <p>This mode applies to all merge engine types.
+     */
+    OVERWRITE(1);
+
+    private final int value;
+
+    MergeMode(int value) {
+        this.value = value;
+    }
+
+    /**
+     * Returns the integer value for this merge mode.
+     *
+     * <p>This value matches the merge_mode field values in the proto 
definition.
+     *
+     * @return the integer value
+     */
+    public int getValue() {
+        return value;
+    }
+
+    /**
+     * Returns the proto value for this merge mode.
+     *
+     * <p>This is an alias for {@link #getValue()} for clarity when working 
with proto messages.
+     *
+     * @return the proto value
+     */
+    public int getProtoValue() {
+        return value;
+    }
+
+    /**
+     * Converts an integer value to a MergeMode enum.
+     *
+     * @param value the integer value
+     * @return the corresponding MergeMode, or DEFAULT if the value is invalid
+     */
+    public static MergeMode fromValue(int value) {
+        switch (value) {
+            case 0:
+                return DEFAULT;
+            case 1:
+                return OVERWRITE;
+            default:
+                return DEFAULT;
+        }
+    }
+
+    /**
+     * Converts a proto value to a MergeMode enum.
+     *
+     * <p>This is an alias for {@link #fromValue(int)} for clarity when 
working with proto messages.
+     *
+     * @param protoValue the proto value
+     * @return the corresponding MergeMode, or DEFAULT if the value is invalid
+     */
+    public static MergeMode fromProtoValue(int protoValue) {
+        return fromValue(protoValue);
+    }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index d3b1edb96..d800549d8 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -219,6 +219,11 @@ message FetchLogResponse {
   repeated PbFetchLogRespForTable tables_resp = 1;
 }
 
+// Aggregation mode constants for write operations (using int32 instead of 
enum for proto3 compatibility)
+// AGG_MODE_AGGREGATE = 0: Data is aggregated through server-side merge engine 
(default)
+// AGG_MODE_OVERWRITE = 1: Bypass merge engine, directly replace values (for 
undo recovery)
+// AGG_MODE_LOCAL_AGGREGATE = 2: Client-side local aggregation (reserved for 
future implementation)
+
 // put kv request and response
 message PutKvRequest {
   required int32 acks = 1;
@@ -228,6 +233,9 @@ message PutKvRequest {
   // if empty, means write all columns
   repeated int32 target_columns = 4 [packed = true];
   repeated PbPutKvReqForBucket buckets_req = 5;
+  // Aggregation mode for this request (see AGG_MODE_* constants above)
+  // 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet 
supported)
+  optional int32 agg_mode = 6;
 }
 
 message PutKvResponse {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 79ab8eb21..0eee04c6c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -45,6 +45,7 @@ import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.row.arrow.ArrowWriterProvider;
 import org.apache.fluss.row.encode.ValueDecoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
 import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
@@ -115,6 +116,9 @@ public final class KvTablet {
     private final KvFormat kvFormat;
     // defines how to merge rows on the same primary key
     private final RowMerger rowMerger;
+    // Pre-created DefaultRowMerger for OVERWRITE mode (undo recovery 
scenarios)
+    // This avoids creating a new instance on every putAsLeader call
+    private final RowMerger overwriteRowMerger;
     private final ArrowCompressionInfo arrowCompressionInfo;
     private final AutoIncrementManager autoIncrementManager;
 
@@ -166,6 +170,9 @@ public final class KvTablet {
         this.memorySegmentPool = memorySegmentPool;
         this.kvFormat = kvFormat;
         this.rowMerger = rowMerger;
+        // Pre-create DefaultRowMerger for OVERWRITE mode to avoid creating 
new instances
+        // on every putAsLeader call. Used for undo recovery scenarios.
+        this.overwriteRowMerger = new DefaultRowMerger(kvFormat, 
DeleteBehavior.ALLOW);
         this.arrowCompressionInfo = arrowCompressionInfo;
         this.schemaGetter = schemaGetter;
         this.changelogImage = changelogImage;
@@ -273,6 +280,20 @@ public final class KvTablet {
         return flushedLogOffset;
     }
 
+    /**
+     * Put the KvRecordBatch into the kv storage with default DEFAULT mode.
+     *
+     * <p>This is a convenience method that calls {@link 
#putAsLeader(KvRecordBatch, int[],
+     * MergeMode)} with {@link MergeMode#DEFAULT}.
+     *
+     * @param kvRecords the kv records to put into
+     * @param targetColumns the target columns to put, null if put all columns
+     */
+    public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] 
targetColumns)
+            throws Exception {
+        return putAsLeader(kvRecords, targetColumns, MergeMode.DEFAULT);
+    }
+
     /**
      * Put the KvRecordBatch into the kv storage, and return the appended wal 
log info.
      *
@@ -292,8 +313,10 @@ public final class KvTablet {
      *
      * @param kvRecords the kv records to put into
      * @param targetColumns the target columns to put, null if put all columns
+     * @param mergeMode the merge mode (DEFAULT or OVERWRITE)
      */
-    public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] 
targetColumns)
+    public LogAppendInfo putAsLeader(
+            KvRecordBatch kvRecords, @Nullable int[] targetColumns, MergeMode 
mergeMode)
             throws Exception {
         return inWriteLock(
                 kvLock,
@@ -305,10 +328,17 @@ public final class KvTablet {
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
                     validateSchemaId(kvRecords.schemaId(), latestSchemaId);
 
-                    // we only support ADD COLUMN, so targetColumns is fine to 
be used directly
+                    // Determine the row merger based on mergeMode:
+                    // - DEFAULT: Use the configured merge engine (rowMerger)
+                    // - OVERWRITE: Bypass merge engine, use pre-created 
overwriteRowMerger
+                    //   to directly replace values (for undo recovery 
scenarios)
+                    // We only support ADD COLUMN, so targetColumns is fine to 
be used directly.
                     RowMerger currentMerger =
-                            rowMerger.configureTargetColumns(
-                                    targetColumns, latestSchemaId, 
latestSchema);
+                            (mergeMode == MergeMode.OVERWRITE)
+                                    ? 
overwriteRowMerger.configureTargetColumns(
+                                            targetColumns, latestSchemaId, 
latestSchema)
+                                    : rowMerger.configureTargetColumns(
+                                            targetColumns, latestSchemaId, 
latestSchema);
                     AutoIncrementUpdater currentAutoIncrementUpdater =
                             autoIncrementManager.getUpdaterForSchema(kvFormat, 
latestSchemaId);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 041bcb910..b884ed88b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -45,6 +45,7 @@ import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.SequenceIDCounter;
 import org.apache.fluss.server.coordinator.CoordinatorContext;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
@@ -946,7 +947,10 @@ public final class Replica {
     }
 
     public LogAppendInfo putRecordsToLeader(
-            KvRecordBatch kvRecords, @Nullable int[] targetColumns, int 
requiredAcks)
+            KvRecordBatch kvRecords,
+            @Nullable int[] targetColumns,
+            MergeMode mergeMode,
+            int requiredAcks)
             throws Exception {
         return inReadLock(
                 leaderIsrUpdateLock,
@@ -964,7 +968,7 @@ public final class Replica {
                             kv, "KvTablet for the replica to put kv records 
shouldn't be null.");
                     LogAppendInfo logAppendInfo;
                     try {
-                        logAppendInfo = kv.putAsLeader(kvRecords, 
targetColumns);
+                        logAppendInfo = kv.putAsLeader(kvRecords, 
targetColumns, mergeMode);
                     } catch (IOException e) {
                         LOG.error("Error while putting records to {}", 
tableBucket, e);
                         fatalErrorHandler.onFatalError(e);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index eeb7a38d2..a9bfde8e2 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -60,6 +60,7 @@ import 
org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.coordinator.CoordinatorContext;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.server.entity.LakeBucketOffset;
@@ -559,6 +560,7 @@ public class ReplicaManager {
             int requiredAcks,
             Map<TableBucket, KvRecordBatch> entriesPerBucket,
             @Nullable int[] targetColumns,
+            MergeMode mergeMode,
             short apiVersion,
             Consumer<List<PutKvResultForBucket>> responseCallback) {
         if (isRequiredAcksInvalid(requiredAcks)) {
@@ -567,7 +569,7 @@ public class ReplicaManager {
 
         long startTime = System.currentTimeMillis();
         Map<TableBucket, PutKvResultForBucket> kvPutResult =
-                putToLocalKv(entriesPerBucket, targetColumns, requiredAcks, 
apiVersion);
+                putToLocalKv(entriesPerBucket, targetColumns, mergeMode, 
requiredAcks, apiVersion);
         LOG.debug(
                 "Put records to local kv storage and wait generate cdc log in 
{} ms",
                 System.currentTimeMillis() - startTime);
@@ -1047,6 +1049,7 @@ public class ReplicaManager {
     private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
             Map<TableBucket, KvRecordBatch> entriesPerBucket,
             @Nullable int[] targetColumns,
+            MergeMode mergeMode,
             int requiredAcks,
             short apiVersion) {
         Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new 
HashMap<>();
@@ -1060,7 +1063,8 @@ public class ReplicaManager {
                 tableMetrics = replica.tableMetrics();
                 tableMetrics.totalPutKvRequests().inc();
                 LogAppendInfo appendInfo =
-                        replica.putRecordsToLeader(entry.getValue(), 
targetColumns, requiredAcks);
+                        replica.putRecordsToLeader(
+                                entry.getValue(), targetColumns, mergeMode, 
requiredAcks);
                 LOG.trace(
                         "Written to local kv for {}, and the cdc log beginning 
at offset {} and ending at offset {}",
                         tb,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index c06c382e6..145cd1f0f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -62,6 +62,7 @@ import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
 import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.security.acl.OperationType;
 import org.apache.fluss.security.acl.Resource;
 import org.apache.fluss.server.DynamicConfigManager;
@@ -224,12 +225,18 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
         authorizeTable(WRITE, request.getTableId());
 
         Map<TableBucket, KvRecordBatch> putKvData = getPutKvData(request);
+        // Get mergeMode from request, default to DEFAULT if not set
+        MergeMode mergeMode =
+                request.hasAggMode()
+                        ? MergeMode.fromValue(request.getAggMode())
+                        : MergeMode.DEFAULT;
         CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
         replicaManager.putRecordsToKv(
                 request.getTimeoutMs(),
                 request.getAcks(),
                 putKvData,
                 getTargetColumns(request),
+                mergeMode,
                 currentSession().getApiVersion(),
                 bucketResponse -> 
response.complete(makePutKvResponse(bucketResponse)));
         return response;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
new file mode 100644
index 000000000..ce1842309
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.KvRecord;
+import org.apache.fluss.record.KvRecordBatch;
+import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecords;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.rpc.protocol.MergeMode;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
+import org.apache.fluss.server.kv.rowmerger.RowMerger;
+import org.apache.fluss.server.log.FetchIsolation;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
+
+/**
+ * Tests for {@link KvTablet} with {@link MergeMode} support.
+ *
+ * <p>These tests verify that OVERWRITE mode correctly bypasses the merge 
engine and directly
+ * replaces values, which is essential for undo recovery scenarios.
+ */
+class KvTabletMergeModeTest {
+
+    private static final short SCHEMA_ID = 1;
+
+    private final Configuration conf = new Configuration();
+    private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory =
+            KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID);
+
+    private @TempDir File tempLogDir;
+    private @TempDir File tmpKvDir;
+
+    private TestingSchemaGetter schemaGetter;
+    private LogTablet logTablet;
+    private KvTablet kvTablet;
+
+    // Schema with aggregation functions for testing
+    private static final Schema AGG_SCHEMA =
+            Schema.newBuilder()
+                    .column("id", DataTypes.INT())
+                    .column("count", DataTypes.BIGINT(), AggFunctions.SUM())
+                    .column("max_val", DataTypes.INT(), AggFunctions.MAX())
+                    .column("name", DataTypes.STRING(), 
AggFunctions.LAST_VALUE())
+                    .primaryKey("id")
+                    .build();
+
+    private static final RowType AGG_ROW_TYPE = AGG_SCHEMA.getRowType();
+
+    private final KvRecordTestUtils.KvRecordFactory kvRecordFactory =
+            KvRecordTestUtils.KvRecordFactory.of(AGG_ROW_TYPE);
+
+    @BeforeEach
+    void setUp() throws Exception {
+        Map<String, String> config = new HashMap<>();
+        config.put("table.merge-engine", "aggregation");
+
+        TablePath tablePath = TablePath.of("testDb", "test_merge_mode");
+        PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
+        schemaGetter = new TestingSchemaGetter(new SchemaInfo(AGG_SCHEMA, 
SCHEMA_ID));
+
+        File logTabletDir =
+                LogTestUtils.makeRandomLogTabletDir(
+                        tempLogDir,
+                        physicalTablePath.getDatabaseName(),
+                        0L,
+                        physicalTablePath.getTableName());
+        logTablet =
+                LogTablet.create(
+                        physicalTablePath,
+                        logTabletDir,
+                        conf,
+                        TestingMetricGroups.TABLET_SERVER_METRICS,
+                        0,
+                        new FlussScheduler(1),
+                        LogFormat.ARROW,
+                        1,
+                        true,
+                        SystemClock.getInstance(),
+                        true);
+
+        TableBucket tableBucket = logTablet.getTableBucket();
+        TableConfig tableConf = new TableConfig(Configuration.fromMap(config));
+        RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, 
schemaGetter);
+        AutoIncrementManager autoIncrementManager =
+                new AutoIncrementManager(
+                        schemaGetter,
+                        tablePath,
+                        new TableConfig(new Configuration()),
+                        new TestingSequenceGeneratorFactory());
+
+        kvTablet =
+                KvTablet.create(
+                        physicalTablePath,
+                        tableBucket,
+                        logTablet,
+                        tmpKvDir,
+                        conf,
+                        TestingMetricGroups.TABLET_SERVER_METRICS,
+                        new RootAllocator(Long.MAX_VALUE),
+                        new TestingMemorySegmentPool(10 * 1024),
+                        KvFormat.COMPACTED,
+                        rowMerger,
+                        DEFAULT_COMPRESSION,
+                        schemaGetter,
+                        tableConf.getChangelogImage(),
+                        KvManager.getDefaultRateLimiter(),
+                        autoIncrementManager);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (kvTablet != null) {
+            kvTablet.close();
+        }
+        if (logTablet != null) {
+            logTablet.close();
+        }
+    }
+
+    // ==================== DEFAULT Mode Tests ====================
+
+    @Test
+    void testDefaultModeAppliesMergeEngine() throws Exception {
+        // Insert initial record: id=1, count=10, max_val=100, name="Alice"
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Update with DEFAULT mode: count should be summed, max_val should 
take max
+        // id=1, count=5, max_val=150, name="Bob"
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 5L, 150, 
"Bob"}));
+        kvTablet.putAsLeader(batch2, null, MergeMode.DEFAULT);
+
+        // Verify CDC log shows aggregated values
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {1, 10L, 100, "Alice"}, // before
+                                new Object[] {
+                                    1, 15L, 150, "Bob"
+                                } // after: count=10+5, max=max(100,150)
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    // ==================== OVERWRITE Mode Tests ====================
+
+    @Test
+    void testOverwriteModeBypassesMergeEngine() throws Exception {
+        // Insert initial record with DEFAULT mode
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Update with OVERWRITE mode: values should be directly replaced, not 
aggregated
+        // id=1, count=5, max_val=50, name="Bob"
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("k1".getBytes(), new Object[] 
{1, 5L, 50, "Bob"}));
+        kvTablet.putAsLeader(batch2, null, MergeMode.OVERWRITE);
+
+        // Verify CDC log shows directly replaced values (not aggregated)
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {1, 10L, 100, "Alice"}, // before
+                                new Object[] {
+                                    1, 5L, 50, "Bob"
+                                } // after: directly replaced, NOT aggregated
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+
+        // Key assertion: count=5 (not 15), max_val=50 (not 100)
+        // This proves OVERWRITE bypassed the merge engine
+    }
+
+    @Test
+    void testOverwriteModeForUndoRecoveryScenario() throws Exception {
+        // Simulate a typical undo recovery scenario:
+        // 1. Initial state: id=1, count=100, max_val=500, name="Original"
+        // 2. After some operations: id=1, count=150, max_val=600, 
name="Updated"
+        // 3. Undo recovery needs to restore to: id=1, count=100, max_val=500, 
name="Original"
+
+        // Step 1: Insert initial record
+        KvRecordBatch initialBatch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 100L, 500, 
"Original"}));
+        kvTablet.putAsLeader(initialBatch, null, MergeMode.DEFAULT);
+
+        // Step 2: Simulate some aggregation operations
+        KvRecordBatch updateBatch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 50L, 600, 
"Updated"}));
+        kvTablet.putAsLeader(updateBatch, null, MergeMode.DEFAULT);
+
+        long beforeUndoOffset = logTablet.localLogEndOffset();
+
+        // Step 3: Undo recovery - restore to original state using OVERWRITE 
mode
+        KvRecordBatch undoBatch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 100L, 500, 
"Original"}));
+        kvTablet.putAsLeader(undoBatch, null, MergeMode.OVERWRITE);
+
+        // Verify the undo operation produced correct CDC log
+        LogRecords actualLogRecords = readLogRecords(beforeUndoOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        beforeUndoOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                // Before: the aggregated state (count=150, 
max_val=600)
+                                new Object[] {1, 150L, 600, "Updated"},
+                                // After: restored to original (count=100, 
max_val=500)
+                                new Object[] {1, 100L, 500, "Original"}));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testOverwriteModeWithNewKey() throws Exception {
+        // OVERWRITE mode with a new key should behave like INSERT
+        KvRecordBatch batch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch, null, MergeMode.OVERWRITE);
+
+        LogRecords actualLogRecords = readLogRecords(0);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        0L,
+                        Collections.singletonList(ChangeType.INSERT),
+                        Collections.singletonList(new Object[] {1, 10L, 100, 
"Alice"}));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testOverwriteModeWithDelete() throws Exception {
+        // Insert initial record
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Delete with OVERWRITE mode
+        KvRecordBatch deleteBatch =
+                
kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("k1".getBytes(), null));
+        kvTablet.putAsLeader(deleteBatch, null, MergeMode.OVERWRITE);
+
+        // Verify DELETE is produced
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Collections.singletonList(ChangeType.DELETE),
+                        Collections.singletonList(new Object[] {1, 10L, 100, 
"Alice"}));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testMixedMergeModeOperations() throws Exception {
+        // Test interleaved DEFAULT and OVERWRITE operations
+
+        // 1. Insert with DEFAULT
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"v1"}));
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        // 2. Update with DEFAULT (should aggregate)
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("k1".getBytes(), new Object[] 
{1, 5L, 150, "v2"}));
+        kvTablet.putAsLeader(batch2, null, MergeMode.DEFAULT);
+
+        long afterDefaultOffset = logTablet.localLogEndOffset();
+
+        // 3. Overwrite with specific value
+        KvRecordBatch batch3 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("k1".getBytes(), new Object[] 
{1, 20L, 80, "v3"}));
+        kvTablet.putAsLeader(batch3, null, MergeMode.OVERWRITE);
+
+        long afterOverwriteOffset = logTablet.localLogEndOffset();
+
+        // 4. Continue with DEFAULT (should aggregate from overwritten value)
+        KvRecordBatch batch4 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 200, 
"v4"}));
+        kvTablet.putAsLeader(batch4, null, MergeMode.DEFAULT);
+
+        // Verify the final aggregation is based on overwritten value
+        LogRecords actualLogRecords = readLogRecords(afterOverwriteOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        afterOverwriteOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                // Before: overwritten value
+                                new Object[] {1, 20L, 80, "v3"},
+                                // After: aggregated from overwritten value
+                                // count=20+10=30, max_val=max(80,200)=200
+                                new Object[] {1, 30L, 200, "v4"}));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testOverwriteModeWithPartialUpdate() throws Exception {
+        // Insert initial record
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Partial update with OVERWRITE mode (only update id and count 
columns)
+        int[] targetColumns = new int[] {0, 1}; // id and count
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 5L, null, 
null}));
+        kvTablet.putAsLeader(batch2, targetColumns, MergeMode.OVERWRITE);
+
+        // Verify partial update with OVERWRITE: count should be replaced (not 
aggregated)
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {1, 10L, 100, "Alice"}, // before
+                                new Object[] {
+                                    1, 5L, 100, "Alice"
+                                } // after: count replaced, others unchanged
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    @Test
+    void testOverwriteModeWithMultipleKeys() throws Exception {
+        // Insert multiple records
+        List<KvRecord> initialRecords =
+                Arrays.asList(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}),
+                        kvRecordFactory.ofRecord(
+                                "k2".getBytes(), new Object[] {2, 20L, 200, 
"Bob"}));
+        KvRecordBatch batch1 = kvRecordBatchFactory.ofRecords(initialRecords);
+        kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Overwrite multiple keys in single batch
+        List<KvRecord> overwriteRecords =
+                Arrays.asList(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 5L, 50, 
"Alice2"}),
+                        kvRecordFactory.ofRecord(
+                                "k2".getBytes(), new Object[] {2, 8L, 80, 
"Bob2"}));
+        KvRecordBatch batch2 = 
kvRecordBatchFactory.ofRecords(overwriteRecords);
+        kvTablet.putAsLeader(batch2, null, MergeMode.OVERWRITE);
+
+        // Verify both keys are overwritten (not aggregated)
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(
+                                ChangeType.UPDATE_BEFORE,
+                                ChangeType.UPDATE_AFTER,
+                                ChangeType.UPDATE_BEFORE,
+                                ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {1, 10L, 100, "Alice"},
+                                new Object[] {1, 5L, 50, "Alice2"}, // k1 
overwritten
+                                new Object[] {2, 20L, 200, "Bob"},
+                                new Object[] {2, 8L, 80, "Bob2"} // k2 
overwritten
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    // ==================== Default MergeMode Tests ====================
+
+    @Test
+    void testDefaultMergeModeIsDefault() throws Exception {
+        // Insert initial record using default (no mergeMode parameter)
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 10L, 100, 
"Alice"}));
+        kvTablet.putAsLeader(batch1, null); // Using overload without mergeMode
+
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Update using default (should aggregate)
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, 5L, 150, 
"Bob"}));
+        kvTablet.putAsLeader(batch2, null); // Using overload without mergeMode
+
+        // Verify aggregation happened (proving default is DEFAULT)
+        LogRecords actualLogRecords = readLogRecords(endOffset);
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(ChangeType.UPDATE_BEFORE, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(
+                                new Object[] {1, 10L, 100, "Alice"},
+                                new Object[] {
+                                    1, 15L, 150, "Bob"
+                                } // count=10+5=15, max=max(100,150)=150
+                                ));
+
+        assertThatLogRecords(actualLogRecords)
+                .withSchema(AGG_ROW_TYPE)
+                .assertCheckSum(true)
+                .isEqualTo(expectedLogs);
+    }
+
+    // ==================== Helper Methods ====================
+
+    private LogRecords readLogRecords(long startOffset) throws Exception {
+        return logTablet
+                .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, 
false, null)
+                .getRecords();
+    }
+
+    private MemoryLogRecords logRecords(
+            long baseOffset, List<ChangeType> changeTypes, List<Object[]> 
rows) throws Exception {
+        return createBasicMemoryLogRecords(
+                AGG_ROW_TYPE,
+                SCHEMA_ID,
+                baseOffset,
+                -1L,
+                CURRENT_LOG_MAGIC_VALUE,
+                NO_WRITER_ID,
+                NO_BATCH_SEQUENCE,
+                changeTypes,
+                rows,
+                LogFormat.ARROW,
+                DEFAULT_COMPRESSION);
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 35afe7a86..fc8a4a054 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
 import org.apache.fluss.rpc.entity.PutKvResultForBucket;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
@@ -481,6 +482,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -494,6 +496,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                         Collections.singletonMap(
                                                 tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                                         null,
+                                        MergeMode.DEFAULT,
                                         PUT_KV_VERSION,
                                         (result) -> {
                                             // do nothing.
@@ -509,6 +512,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(unknownTb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get())
@@ -544,6 +548,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
0)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5));
@@ -589,6 +594,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
3)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         PutKvResultForBucket putKvResultForBucket = future.get().get(0);
@@ -630,6 +636,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                         genKvRecordBatchWithWriterId(
                                 data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 
1)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -679,6 +686,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                     1,
                     Collections.singletonMap(tb, genKvRecordBatch(deleteList)),
                     null,
+                    MergeMode.DEFAULT,
                     PUT_KV_VERSION,
                     future::complete);
             assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 
i + 1));
@@ -691,6 +699,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 
18));
@@ -752,6 +761,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future1::complete);
         assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 
8));
@@ -824,6 +834,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, genKvRecordBatch(keyType, 
rowType, data1)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 4));
@@ -904,6 +915,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future1::complete);
         assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb, 
8));
@@ -1145,6 +1157,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 Collections.singletonMap(tb, 
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 future::complete);
         assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -1314,6 +1327,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                                                             
Collections.singletonList(
                                                                     
Tuple2.of(key, value)))),
                                             null,
+                                            MergeMode.DEFAULT,
                                             PUT_KV_VERSION,
                                             future::complete);
                                 } catch (Exception e) {
@@ -1395,6 +1409,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 entriesPerBucket,
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 writeResultForBuckets -> {
                     // do nothing
@@ -1434,6 +1449,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 -1,
                 entriesPerBucket,
                 null,
+                MergeMode.DEFAULT,
                 PUT_KV_VERSION,
                 writeResultForBuckets -> {
                     // do nothing
@@ -1847,7 +1863,13 @@ class ReplicaManagerTest extends ReplicaTestBase {
         CompletableFuture<List<PutKvResultForBucket>> writeFuture = new 
CompletableFuture<>();
         // put kv record batch for every bucket
         replicaManager.putRecordsToKv(
-                300000, -1, kvRecordBatchPerBucket, null, PUT_KV_VERSION, 
writeFuture::complete);
+                300000,
+                -1,
+                kvRecordBatchPerBucket,
+                null,
+                MergeMode.DEFAULT,
+                PUT_KV_VERSION,
+                writeFuture::complete);
         // wait the write ack
         writeFuture.get();
     }
@@ -1955,6 +1977,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
                 1,
                 Collections.singletonMap(tb, genKvRecordBatch(keyType, 
rowType, data)),
                 null,
+                MergeMode.DEFAULT,
                 oldClientVersion,
                 putFuture::complete);
         PutKvResultForBucket putResult = putFuture.get().get(0);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 8bdd46db6..04331fb6a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -32,6 +32,7 @@ import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.record.ProjectionPushdownCache;
+import org.apache.fluss.rpc.protocol.MergeMode;
 import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.KvTablet;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -868,7 +869,9 @@ final class ReplicaTest extends ReplicaTestBase {
 
     private LogAppendInfo putRecordsToLeader(
             Replica replica, KvRecordBatch kvRecords, int[] targetColumns) 
throws Exception {
-        LogAppendInfo logAppendInfo = replica.putRecordsToLeader(kvRecords, 
targetColumns, 0);
+        // Use DEFAULT mode as default for tests
+        LogAppendInfo logAppendInfo =
+                replica.putRecordsToLeader(kvRecords, targetColumns, 
MergeMode.DEFAULT, 0);
         KvTablet kvTablet = checkNotNull(replica.getKvTablet());
         // flush to make data visible
         kvTablet.flush(replica.getLocalLogEndOffset(), 
NOPErrorHandler.INSTANCE);

Reply via email to