This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1e848975c [kv] Add OVERWRITE MergeMode support for undo recovery in
aggregation tables (#2492)
1e848975c is described below
commit 1e848975c1a905013150a87902ba44cbe149c64b
Author: Yang Wang <[email protected]>
AuthorDate: Fri Jan 30 19:04:00 2026 +0800
[kv] Add OVERWRITE MergeMode support for undo recovery in aggregation
tables (#2492)
---
.../fluss/client/table/writer/TableUpsert.java | 42 +-
.../apache/fluss/client/table/writer/Upsert.java | 46 +-
.../client/table/writer/UpsertWriterImpl.java | 21 +-
.../fluss/client/utils/ClientRpcMessageUtils.java | 21 +-
.../apache/fluss/client/write/KvWriteBatch.java | 19 +
.../fluss/client/write/RecordAccumulator.java | 1 +
.../org/apache/fluss/client/write/WriteRecord.java | 79 ++-
.../client/utils/ClientRpcMessageUtilsTest.java | 144 ++++++
.../fluss/client/write/KvWriteBatchTest.java | 84 ++++
.../org/apache/fluss/rpc/protocol/MergeMode.java | 119 +++++
fluss-rpc/src/main/proto/FlussApi.proto | 8 +
.../java/org/apache/fluss/server/kv/KvTablet.java | 38 +-
.../org/apache/fluss/server/replica/Replica.java | 8 +-
.../fluss/server/replica/ReplicaManager.java | 8 +-
.../apache/fluss/server/tablet/TabletService.java | 7 +
.../fluss/server/kv/KvTabletMergeModeTest.java | 557 +++++++++++++++++++++
.../fluss/server/replica/ReplicaManagerTest.java | 25 +-
.../apache/fluss/server/replica/ReplicaTest.java | 5 +-
18 files changed, 1203 insertions(+), 29 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
index fe865eac4..ad2d0990d 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java
@@ -18,12 +18,16 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.client.write.WriterClient;
+import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
+import java.util.Optional;
+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/** API for configuring and creating {@link UpsertWriter}. */
@@ -32,22 +36,24 @@ public class TableUpsert implements Upsert {
private final TablePath tablePath;
private final TableInfo tableInfo;
private final WriterClient writerClient;
-
private final @Nullable int[] targetColumns;
+ private final MergeMode mergeMode;
public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient
writerClient) {
- this(tablePath, tableInfo, writerClient, null);
+ this(tablePath, tableInfo, writerClient, null, MergeMode.DEFAULT);
}
private TableUpsert(
TablePath tablePath,
TableInfo tableInfo,
WriterClient writerClient,
- @Nullable int[] targetColumns) {
+ @Nullable int[] targetColumns,
+ MergeMode mergeMode) {
this.tablePath = tablePath;
this.tableInfo = tableInfo;
this.writerClient = writerClient;
this.targetColumns = targetColumns;
+ this.mergeMode = mergeMode;
}
@Override
@@ -68,7 +74,7 @@ public class TableUpsert implements Upsert {
}
}
}
- return new TableUpsert(tablePath, tableInfo, writerClient,
targetColumns);
+ return new TableUpsert(tablePath, tableInfo, writerClient,
targetColumns, this.mergeMode);
}
@Override
@@ -91,13 +97,39 @@ public class TableUpsert implements Upsert {
return partialUpdate(targetColumns);
}
+ @Override
+ public Upsert mergeMode(MergeMode mode) {
+ checkNotNull(mode, "merge mode");
+ return new TableUpsert(tablePath, tableInfo, writerClient,
this.targetColumns, mode);
+ }
+
@Override
public UpsertWriter createWriter() {
- return new UpsertWriterImpl(tablePath, tableInfo, targetColumns,
writerClient);
+ // Check that OVERWRITE mode is only used with tables that have a
merge engine
+ if (mergeMode == MergeMode.OVERWRITE) {
+ Optional<MergeEngineType> mergeEngineType =
+ tableInfo.getTableConfig().getMergeEngineType();
+ if (!mergeEngineType.isPresent()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "MergeMode %s is only supported for tables
with a merge engine, "
+ + "but table %s does not have a merge
engine configured.",
+ mergeMode, tablePath));
+ }
+ }
+ return new UpsertWriterImpl(tablePath, tableInfo, targetColumns,
writerClient, mergeMode);
}
@Override
public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) {
+ // TypedUpsertWriterImpl doesn't support mergeMode yet
+ if (mergeMode != MergeMode.DEFAULT) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "TypedUpsertWriter does not support MergeMode %s
yet. "
+ + "Please use createWriter() instead.",
+ mergeMode));
+ }
return new TypedUpsertWriterImpl<>(createWriter(), pojoClass,
tableInfo, targetColumns);
}
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
index 0843437fe..e8c859272 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
@@ -18,6 +18,7 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.rpc.protocol.MergeMode;
import javax.annotation.Nullable;
@@ -26,7 +27,8 @@ import javax.annotation.Nullable;
* Table.
*
* <p>{@link Upsert} objects are immutable and can be shared between threads.
Refinement methods,
- * like {@link #partialUpdate}, create new Upsert instances.
+ * like {@link #partialUpdate(int[])} and {@link #mergeMode(MergeMode)},
create new Upsert
+ * instances.
*
* @since 0.6
*/
@@ -56,9 +58,49 @@ public interface Upsert {
*/
Upsert partialUpdate(String... targetColumnNames);
+ /**
+ * Specify merge mode for the UpsertWriter and returns a new Upsert
instance.
+ *
+ * <p>This method controls how the created UpsertWriter handles data
merging for tables with
+ * merge engines:
+ *
+ * <ul>
+ * <li>{@link MergeMode#DEFAULT} (default): Data is merged through the
server-side merge
+ * engine. The behavior depends on the configured merge engine type:
+ * <ul>
+ * <li>For aggregation merge engine: applies aggregation functions
(e.g., SUM, MAX,
+ * MIN).
+ * <li>For first-row merge engine: retains the first observed row.
+ * <li>For versioned merge engine: keeps the row with the highest
version.
+ * </ul>
+ * <li>{@link MergeMode#OVERWRITE}: Data directly overwrites values,
bypassing the merge
+ * engine. This is useful for undo/recovery operations to restore
exact historical values.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * // Default merge mode
+ * UpsertWriter normalWriter = table.newUpsert()
+ * .mergeMode(MergeMode.DEFAULT)
+ * .createWriter();
+ *
+ * // Overwrite mode for undo recovery
+ * UpsertWriter undoWriter = table.newUpsert()
+ * .mergeMode(MergeMode.OVERWRITE)
+ * .createWriter();
+ * }</pre>
+ *
+ * @param mode the merge mode
+ * @return a new Upsert instance with the specified merge mode
+ * @since 0.9
+ */
+ Upsert mergeMode(MergeMode mode);
+
/**
* Create a new {@link UpsertWriter} using {@code InternalRow} with the
optional {@link
- * #partialUpdate(String...)} information to upsert and delete data to a
Primary Key Table.
+ * #partialUpdate(String...)} and {@link #mergeMode(MergeMode)}
information to upsert and delete
+ * data to a Primary Key Table.
*/
UpsertWriter createWriter();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 9a612e090..d2f52d5ca 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -30,6 +30,7 @@ import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
@@ -55,11 +56,23 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;
+ /** The merge mode for this writer. This controls how the server handles
data merging. */
+ private final MergeMode mergeMode;
+
UpsertWriterImpl(
TablePath tablePath,
TableInfo tableInfo,
@Nullable int[] partialUpdateColumns,
WriterClient writerClient) {
+ this(tablePath, tableInfo, partialUpdateColumns, writerClient,
MergeMode.DEFAULT);
+ }
+
+ UpsertWriterImpl(
+ TablePath tablePath,
+ TableInfo tableInfo,
+ @Nullable int[] partialUpdateColumns,
+ WriterClient writerClient,
+ MergeMode mergeMode) {
super(tablePath, tableInfo, writerClient);
RowType rowType = tableInfo.getRowType();
sanityCheck(
@@ -88,7 +101,9 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
this.fieldGetters = InternalRow.createFieldGetters(rowType);
+
this.tableInfo = tableInfo;
+ this.mergeMode = mergeMode;
}
private static void sanityCheck(
@@ -173,7 +188,8 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
key,
bucketKey,
writeFormat,
- targetColumns);
+ targetColumns,
+ mergeMode);
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
}
@@ -196,7 +212,8 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
key,
bucketKey,
writeFormat,
- targetColumns);
+ targetColumns,
+ mergeMode);
return send(record).thenApply(ignored -> DELETE_SUCCESS);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index f6b37f5d1..68d8712f1 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -79,6 +79,7 @@ import org.apache.fluss.rpc.messages.PrefixLookupRequest;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.utils.json.DataTypeJsonSerde;
import org.apache.fluss.utils.json.JsonSerdeUtils;
@@ -138,11 +139,12 @@ public class ClientRpcMessageUtils {
.setTimeoutMs(maxRequestTimeoutMs);
// check the target columns in the batch list should be the same. If
not same,
// we throw exception directly currently.
- int[] targetColumns =
- ((KvWriteBatch)
readyWriteBatches.get(0).writeBatch()).getTargetColumns();
+ KvWriteBatch firstBatch = (KvWriteBatch)
readyWriteBatches.get(0).writeBatch();
+ int[] targetColumns = firstBatch.getTargetColumns();
+ MergeMode mergeMode = firstBatch.getMergeMode();
for (int i = 1; i < readyWriteBatches.size(); i++) {
- int[] currentBatchTargetColumns =
- ((KvWriteBatch)
readyWriteBatches.get(i).writeBatch()).getTargetColumns();
+ KvWriteBatch currentBatch = (KvWriteBatch)
readyWriteBatches.get(i).writeBatch();
+ int[] currentBatchTargetColumns = currentBatch.getTargetColumns();
if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) {
throw new IllegalStateException(
String.format(
@@ -151,10 +153,21 @@ public class ClientRpcMessageUtils {
Arrays.toString(targetColumns),
Arrays.toString(currentBatchTargetColumns)));
}
+ // Validate mergeMode consistency across batches
+ if (currentBatch.getMergeMode() != mergeMode) {
+ throw new IllegalStateException(
+ String.format(
+ "All the write batches to make put kv request
should have the same mergeMode, "
+ + "but got %s and %s.",
+ mergeMode, currentBatch.getMergeMode()));
+ }
}
if (targetColumns != null) {
request.setTargetColumns(targetColumns);
}
+ // Set mergeMode in the request - this is the proper way to pass
mergeMode to server
+ request.setAggMode(mergeMode.getProtoValue());
+
readyWriteBatches.forEach(
readyBatch -> {
TableBucket tableBucket = readyBatch.tableBucket();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
index 72f596be9..a72d36717 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
@@ -28,6 +28,7 @@ import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch {
private final KvRecordBatchBuilder recordsBuilder;
private final @Nullable int[] targetColumns;
private final int schemaId;
+ private final MergeMode mergeMode;
public KvWriteBatch(
int bucketId,
@@ -60,6 +62,7 @@ public class KvWriteBatch extends WriteBatch {
int writeLimit,
AbstractPagedOutputView outputView,
@Nullable int[] targetColumns,
+ MergeMode mergeMode,
long createdMs) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
@@ -67,6 +70,7 @@ public class KvWriteBatch extends WriteBatch {
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView,
kvFormat);
this.targetColumns = targetColumns;
this.schemaId = schemaId;
+ this.mergeMode = mergeMode;
}
@Override
@@ -94,6 +98,16 @@ public class KvWriteBatch extends WriteBatch {
Arrays.toString(targetColumns)));
}
+ // Validate mergeMode consistency - records with different mergeMode
cannot be batched
+ // together
+ if (writeRecord.getMergeMode() != this.mergeMode) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot mix records with different mergeMode in
the same batch. "
+ + "Batch mergeMode: %s, Record mergeMode:
%s",
+ this.mergeMode, writeRecord.getMergeMode()));
+ }
+
byte[] key = writeRecord.getKey();
checkNotNull(key, "key must be not null for kv record");
checkNotNull(callback, "write callback must be not null");
@@ -113,6 +127,10 @@ public class KvWriteBatch extends WriteBatch {
return targetColumns;
}
+ public MergeMode getMergeMode() {
+ return mergeMode;
+ }
+
@Override
public BytesView build() {
try {
@@ -163,6 +181,7 @@ public class KvWriteBatch extends WriteBatch {
recordsBuilder.abort();
}
+ @Override
public void resetWriterState(long writerId, int batchSequence) {
super.resetWriterState(writerId, batchSequence);
recordsBuilder.resetWriterState(writerId, batchSequence);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 0c834a49f..a1f0544a5 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -617,6 +617,7 @@ public final class RecordAccumulator {
outputView.getPreAllocatedSize(),
outputView,
writeRecord.getTargetColumns(),
+ writeRecord.getMergeMode(),
clock.milliseconds());
case ARROW_LOG:
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 9265c5f77..f25562be7 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -27,6 +27,7 @@ import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.rpc.protocol.MergeMode;
import javax.annotation.Nullable;
@@ -52,6 +53,27 @@ public final class WriteRecord {
byte[] bucketKey,
WriteFormat writeFormat,
@Nullable int[] targetColumns) {
+ return forUpsert(
+ tableInfo,
+ tablePath,
+ row,
+ key,
+ bucketKey,
+ writeFormat,
+ targetColumns,
+ MergeMode.DEFAULT);
+ }
+
+ /** Create a write record for upsert operation with merge mode control. */
+ public static WriteRecord forUpsert(
+ TableInfo tableInfo,
+ PhysicalTablePath tablePath,
+ BinaryRow row,
+ byte[] key,
+ byte[] bucketKey,
+ WriteFormat writeFormat,
+ @Nullable int[] targetColumns,
+ MergeMode mergeMode) {
checkNotNull(row, "row must not be null");
checkNotNull(key, "key must not be null");
checkNotNull(bucketKey, "bucketKey must not be null");
@@ -65,7 +87,8 @@ public final class WriteRecord {
row,
writeFormat,
targetColumns,
- estimatedSizeInBytes);
+ estimatedSizeInBytes,
+ mergeMode);
}
/** Create a write record for delete operation and partial-delete update.
*/
@@ -76,6 +99,25 @@ public final class WriteRecord {
byte[] bucketKey,
WriteFormat writeFormat,
@Nullable int[] targetColumns) {
+ return forDelete(
+ tableInfo,
+ tablePath,
+ key,
+ bucketKey,
+ writeFormat,
+ targetColumns,
+ MergeMode.DEFAULT);
+ }
+
+ /** Create a write record for delete operation with merge mode control. */
+ public static WriteRecord forDelete(
+ TableInfo tableInfo,
+ PhysicalTablePath tablePath,
+ byte[] key,
+ byte[] bucketKey,
+ WriteFormat writeFormat,
+ @Nullable int[] targetColumns,
+ MergeMode mergeMode) {
checkNotNull(key, "key must not be null");
checkNotNull(bucketKey, "key must not be null");
checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
@@ -88,7 +130,8 @@ public final class WriteRecord {
null,
writeFormat,
targetColumns,
- estimatedSizeInBytes);
+ estimatedSizeInBytes,
+ mergeMode);
}
/** Create a write record for append operation for indexed format. */
@@ -108,7 +151,8 @@ public final class WriteRecord {
row,
WriteFormat.INDEXED_LOG,
null,
- estimatedSizeInBytes);
+ estimatedSizeInBytes,
+ MergeMode.DEFAULT);
}
/** Creates a write record for append operation for Arrow format. */
@@ -129,7 +173,8 @@ public final class WriteRecord {
row,
WriteFormat.ARROW_LOG,
null,
- estimatedSizeInBytes);
+ estimatedSizeInBytes,
+ MergeMode.DEFAULT);
}
/** Creates a write record for append operation for Compacted format. */
@@ -149,7 +194,8 @@ public final class WriteRecord {
row,
WriteFormat.COMPACTED_LOG,
null,
- estimatedSizeInBytes);
+ estimatedSizeInBytes,
+ MergeMode.DEFAULT);
}
//
------------------------------------------------------------------------------------------
@@ -166,6 +212,16 @@ public final class WriteRecord {
private final int estimatedSizeInBytes;
private final TableInfo tableInfo;
+ /**
+ * The merge mode for this record. This controls how the server handles
data merging.
+ *
+ * <ul>
+ * <li>DEFAULT: Normal merge through server-side merge engine
+ * <li>OVERWRITE: Bypass merge engine, directly replace values (for undo
recovery)
+ * </ul>
+ */
+ private final MergeMode mergeMode;
+
private WriteRecord(
TableInfo tableInfo,
PhysicalTablePath physicalTablePath,
@@ -174,7 +230,8 @@ public final class WriteRecord {
@Nullable InternalRow row,
WriteFormat writeFormat,
@Nullable int[] targetColumns,
- int estimatedSizeInBytes) {
+ int estimatedSizeInBytes,
+ MergeMode mergeMode) {
this.tableInfo = tableInfo;
this.physicalTablePath = physicalTablePath;
this.key = key;
@@ -183,6 +240,7 @@ public final class WriteRecord {
this.writeFormat = writeFormat;
this.targetColumns = targetColumns;
this.estimatedSizeInBytes = estimatedSizeInBytes;
+ this.mergeMode = mergeMode;
}
public PhysicalTablePath getPhysicalTablePath() {
@@ -214,6 +272,15 @@ public final class WriteRecord {
return writeFormat;
}
+ /**
+ * Get the merge mode for this record.
+ *
+ * @return the merge mode
+ */
+ public MergeMode getMergeMode() {
+ return mergeMode;
+ }
+
/**
* Get the estimated size in bytes of the record with batch header.
*
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
new file mode 100644
index 000000000..5b1ca58c7
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.client.write.KvWriteBatch;
+import org.apache.fluss.client.write.ReadyWriteBatch;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.memory.PreAllocatedPagedOutputView;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.rpc.protocol.MergeMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link ClientRpcMessageUtils}.
+ *
+ * <p>Focuses on MergeMode consistency validation in makePutKvRequest.
+ */
+class ClientRpcMessageUtilsTest {
+
+ private static final long TABLE_ID = DATA1_TABLE_ID_PK;
+ private static final int ACKS = 1;
+ private static final int TIMEOUT_MS = 30000;
+
+ @Test
+ void testMakePutKvRequestWithConsistentMergeMode() throws Exception {
+ // Create two batches with same mergeMode (DEFAULT)
+ KvWriteBatch batch1 = createKvWriteBatch(0, MergeMode.DEFAULT);
+ KvWriteBatch batch2 = createKvWriteBatch(1, MergeMode.DEFAULT);
+
+ List<ReadyWriteBatch> readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0),
batch1),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1),
batch2));
+
+ // Should succeed without exception
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS,
TIMEOUT_MS, readyBatches);
+
+ // Verify mergeMode is set correctly in request
+ assertThat(request.hasAggMode()).isTrue();
+
assertThat(request.getAggMode()).isEqualTo(MergeMode.DEFAULT.getProtoValue());
+ }
+
+ @Test
+ void testMakePutKvRequestWithOverwriteMode() throws Exception {
+ // Create batches with OVERWRITE mode
+ KvWriteBatch batch1 = createKvWriteBatch(0, MergeMode.OVERWRITE);
+ KvWriteBatch batch2 = createKvWriteBatch(1, MergeMode.OVERWRITE);
+
+ List<ReadyWriteBatch> readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0),
batch1),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1),
batch2));
+
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS,
TIMEOUT_MS, readyBatches);
+
+ // Verify OVERWRITE mode is set correctly
+ assertThat(request.hasAggMode()).isTrue();
+
assertThat(request.getAggMode()).isEqualTo(MergeMode.OVERWRITE.getProtoValue());
+ }
+
+ @Test
+ void testMakePutKvRequestWithInconsistentMergeMode() throws Exception {
+ // Create batches with different mergeModes
+ KvWriteBatch defaultBatch = createKvWriteBatch(0, MergeMode.DEFAULT);
+ KvWriteBatch overwriteBatch = createKvWriteBatch(1,
MergeMode.OVERWRITE);
+
+ List<ReadyWriteBatch> readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0),
defaultBatch),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1),
overwriteBatch));
+
+ // Should throw exception due to inconsistent mergeMode
+ assertThatThrownBy(
+ () ->
+ ClientRpcMessageUtils.makePutKvRequest(
+ TABLE_ID, ACKS, TIMEOUT_MS,
readyBatches))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "All the write batches to make put kv request should
have the same mergeMode")
+ .hasMessageContaining("DEFAULT")
+ .hasMessageContaining("OVERWRITE");
+ }
+
+ @Test
+ void testMakePutKvRequestWithSingleBatch() throws Exception {
+ // Single batch should always succeed
+ KvWriteBatch batch = createKvWriteBatch(0, MergeMode.OVERWRITE);
+
+ List<ReadyWriteBatch> readyBatches =
+ Collections.singletonList(new ReadyWriteBatch(new
TableBucket(TABLE_ID, 0), batch));
+
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS,
TIMEOUT_MS, readyBatches);
+
+
assertThat(request.getAggMode()).isEqualTo(MergeMode.OVERWRITE.getProtoValue());
+ }
+
+ private KvWriteBatch createKvWriteBatch(int bucketId, MergeMode mergeMode)
throws Exception {
+ MemorySegment segment = MemorySegment.allocateHeapMemory(1024);
+ PreAllocatedPagedOutputView outputView =
+ new
PreAllocatedPagedOutputView(Collections.singletonList(segment));
+ return new KvWriteBatch(
+ bucketId,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ DATA1_TABLE_INFO_PK.getSchemaId(),
+ KvFormat.COMPACTED,
+ Integer.MAX_VALUE,
+ outputView,
+ null,
+ mergeMode,
+ System.currentTimeMillis());
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index c62c7b029..3fe428388 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -33,6 +33,7 @@ import org.apache.fluss.record.KvRecordReadContext;
import org.apache.fluss.record.TestingSchemaGetter;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.types.DataType;
import org.junit.jupiter.api.BeforeEach;
@@ -227,6 +228,7 @@ class KvWriteBatchTest {
writeLimit,
outputView,
null,
+ MergeMode.DEFAULT,
System.currentTimeMillis());
}
@@ -254,4 +256,86 @@ class KvWriteBatchTest {
assertThat(toArray(kvRecord.getKey())).isEqualTo(key);
assertThat(kvRecord.getRow()).isEqualTo(row);
}
+
+ // ==================== MergeMode Tests ====================
+
+ @Test
+ void testMergeModeConsistencyValidation() throws Exception {
+ // Create batch with DEFAULT mode
+ KvWriteBatch defaultBatch =
+ createKvWriteBatchWithMergeMode(
+ new TableBucket(DATA1_TABLE_ID_PK, 0),
MergeMode.DEFAULT);
+
+ // Append record with DEFAULT mode should succeed
+ WriteRecord defaultRecord =
createWriteRecordWithMergeMode(MergeMode.DEFAULT);
+ assertThat(defaultBatch.tryAppend(defaultRecord,
newWriteCallback())).isTrue();
+
+ // Append record with OVERWRITE mode should fail
+ WriteRecord overwriteRecord =
createWriteRecordWithMergeMode(MergeMode.OVERWRITE);
+ assertThatThrownBy(() -> defaultBatch.tryAppend(overwriteRecord,
newWriteCallback()))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot mix records with different mergeMode in the
same batch")
+ .hasMessageContaining("Batch mergeMode: DEFAULT")
+ .hasMessageContaining("Record mergeMode: OVERWRITE");
+ }
+
+ @Test
+ void testOverwriteModeBatch() throws Exception {
+ // Create batch with OVERWRITE mode
+ KvWriteBatch overwriteBatch =
+ createKvWriteBatchWithMergeMode(
+ new TableBucket(DATA1_TABLE_ID_PK, 0),
MergeMode.OVERWRITE);
+
+ // Verify batch has correct mergeMode
+
assertThat(overwriteBatch.getMergeMode()).isEqualTo(MergeMode.OVERWRITE);
+
+ // Append record with OVERWRITE mode should succeed
+ WriteRecord overwriteRecord =
createWriteRecordWithMergeMode(MergeMode.OVERWRITE);
+ assertThat(overwriteBatch.tryAppend(overwriteRecord,
newWriteCallback())).isTrue();
+
+ // Append record with DEFAULT mode should fail
+ WriteRecord defaultRecord =
createWriteRecordWithMergeMode(MergeMode.DEFAULT);
+ assertThatThrownBy(() -> overwriteBatch.tryAppend(defaultRecord,
newWriteCallback()))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot mix records with different mergeMode in the
same batch")
+ .hasMessageContaining("Batch mergeMode: OVERWRITE")
+ .hasMessageContaining("Record mergeMode: DEFAULT");
+ }
+
+ @Test
+ void testDefaultMergeModeIsDefault() throws Exception {
+ KvWriteBatch batch = createKvWriteBatch(new
TableBucket(DATA1_TABLE_ID_PK, 0));
+ assertThat(batch.getMergeMode()).isEqualTo(MergeMode.DEFAULT);
+ }
+
+ private KvWriteBatch createKvWriteBatchWithMergeMode(TableBucket tb,
MergeMode mergeMode)
+ throws Exception {
+ PreAllocatedPagedOutputView outputView =
+ new PreAllocatedPagedOutputView(
+ Collections.singletonList(memoryPool.nextSegment()));
+ return new KvWriteBatch(
+ tb.getBucket(),
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ DATA1_TABLE_INFO_PK.getSchemaId(),
+ KvFormat.COMPACTED,
+ Integer.MAX_VALUE,
+ outputView,
+ null,
+ mergeMode,
+ System.currentTimeMillis());
+ }
+
+ private WriteRecord createWriteRecordWithMergeMode(MergeMode mergeMode) {
+ return WriteRecord.forUpsert(
+ DATA1_TABLE_INFO_PK,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ row,
+ key,
+ key,
+ WriteFormat.COMPACTED_KV,
+ null,
+ mergeMode);
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java
b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java
new file mode 100644
index 000000000..eb3cafbad
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/MergeMode.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.protocol;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Merge mode for write operations on tables with merge engines.
+ *
+ * <p>This enum controls how the server handles data merging when writing to
tables with merge
+ * engines. It applies uniformly to all merge engine types (aggregation,
first-row, versioned).
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public enum MergeMode {
+
+ /**
+ * Default merge mode: Data is merged through the server-side merge engine.
+ *
+ * <p>This is the normal mode for tables with merge engines. The behavior
depends on the
+ * configured merge engine type:
+ *
+ * <ul>
+ * <li>For aggregation merge engine: applies configured aggregation
functions (e.g., SUM, MAX,
+ * MIN) to merge new values with existing values.
+ * <li>For first-row merge engine: retains the first observed row for
each primary key.
+ * <li>For versioned merge engine: keeps the row with the highest
version for each primary
+ * key.
+ * </ul>
+ */
+ DEFAULT(0),
+
+ /**
+ * Overwrite mode: Data directly overwrites target values, bypassing the
merge engine.
+ *
+ * <p>This mode is used for undo/recovery operations to restore exact
historical values. When in
+ * overwrite mode, the server will not apply any merge logic and will
directly replace the
+ * existing values with the new values.
+ *
+ * <p>This is similar to an "OVERWRITE INTO" statement and is typically
used internally by the
+ * Flink connector during failover recovery to restore the state to a
previous checkpoint.
+ *
+ * <p>This mode applies to all merge engine types.
+ */
+ OVERWRITE(1);
+
+ private final int value;
+
+ MergeMode(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the integer value for this merge mode.
+ *
+ * <p>This value matches the merge_mode field values in the proto
definition.
+ *
+ * @return the integer value
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Returns the proto value for this merge mode.
+ *
+ * <p>This is an alias for {@link #getValue()} for clarity when working
with proto messages.
+ *
+ * @return the proto value
+ */
+ public int getProtoValue() {
+ return value;
+ }
+
+ /**
+ * Converts an integer value to a MergeMode enum.
+ *
+ * @param value the integer value
+ * @return the corresponding MergeMode, or DEFAULT if the value is invalid
+ */
+ public static MergeMode fromValue(int value) {
+ switch (value) {
+ case 0:
+ return DEFAULT;
+ case 1:
+ return OVERWRITE;
+ default:
+ return DEFAULT;
+ }
+ }
+
+ /**
+ * Converts a proto value to a MergeMode enum.
+ *
+ * <p>This is an alias for {@link #fromValue(int)} for clarity when
working with proto messages.
+ *
+ * @param protoValue the proto value
+ * @return the corresponding MergeMode, or DEFAULT if the value is invalid
+ */
+ public static MergeMode fromProtoValue(int protoValue) {
+ return fromValue(protoValue);
+ }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index d3b1edb96..d800549d8 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -219,6 +219,11 @@ message FetchLogResponse {
repeated PbFetchLogRespForTable tables_resp = 1;
}
+// Aggregation mode constants for write operations (using int32 instead of
enum for proto3 compatibility)
+// AGG_MODE_AGGREGATE = 0: Data is aggregated through server-side merge engine
(default)
+// AGG_MODE_OVERWRITE = 1: Bypass merge engine, directly replace values (for
undo recovery)
+// AGG_MODE_LOCAL_AGGREGATE = 2: Client-side local aggregation (reserved for
future implementation)
+
// put kv request and response
message PutKvRequest {
required int32 acks = 1;
@@ -228,6 +233,9 @@ message PutKvRequest {
// if empty, means write all columns
repeated int32 target_columns = 4 [packed = true];
repeated PbPutKvReqForBucket buckets_req = 5;
+ // Aggregation mode for this request (see AGG_MODE_* constants above)
+ // 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet
supported)
+ optional int32 agg_mode = 6;
}
message PutKvResponse {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 79ab8eb21..0eee04c6c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -45,6 +45,7 @@ import org.apache.fluss.row.PaddingRow;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
import org.apache.fluss.row.encode.ValueDecoder;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
@@ -115,6 +116,9 @@ public final class KvTablet {
private final KvFormat kvFormat;
// defines how to merge rows on the same primary key
private final RowMerger rowMerger;
+ // Pre-created DefaultRowMerger for OVERWRITE mode (undo recovery
scenarios)
+ // This avoids creating a new instance on every putAsLeader call
+ private final RowMerger overwriteRowMerger;
private final ArrowCompressionInfo arrowCompressionInfo;
private final AutoIncrementManager autoIncrementManager;
@@ -166,6 +170,9 @@ public final class KvTablet {
this.memorySegmentPool = memorySegmentPool;
this.kvFormat = kvFormat;
this.rowMerger = rowMerger;
+ // Pre-create DefaultRowMerger for OVERWRITE mode to avoid creating
new instances
+ // on every putAsLeader call. Used for undo recovery scenarios.
+ this.overwriteRowMerger = new DefaultRowMerger(kvFormat,
DeleteBehavior.ALLOW);
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.changelogImage = changelogImage;
@@ -273,6 +280,20 @@ public final class KvTablet {
return flushedLogOffset;
}
+ /**
+ * Put the KvRecordBatch into the kv storage with default DEFAULT mode.
+ *
+ * <p>This is a convenience method that calls {@link
#putAsLeader(KvRecordBatch, int[],
+ * MergeMode)} with {@link MergeMode#DEFAULT}.
+ *
+ * @param kvRecords the kv records to put into
+ * @param targetColumns the target columns to put, null if put all columns
+ */
+ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[]
targetColumns)
+ throws Exception {
+ return putAsLeader(kvRecords, targetColumns, MergeMode.DEFAULT);
+ }
+
/**
* Put the KvRecordBatch into the kv storage, and return the appended wal
log info.
*
@@ -292,8 +313,10 @@ public final class KvTablet {
*
* @param kvRecords the kv records to put into
* @param targetColumns the target columns to put, null if put all columns
+ * @param mergeMode the merge mode (DEFAULT or OVERWRITE)
*/
- public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[]
targetColumns)
+ public LogAppendInfo putAsLeader(
+ KvRecordBatch kvRecords, @Nullable int[] targetColumns, MergeMode
mergeMode)
throws Exception {
return inWriteLock(
kvLock,
@@ -305,10 +328,17 @@ public final class KvTablet {
short latestSchemaId = (short) schemaInfo.getSchemaId();
validateSchemaId(kvRecords.schemaId(), latestSchemaId);
- // we only support ADD COLUMN, so targetColumns is fine to
be used directly
+ // Determine the row merger based on mergeMode:
+ // - DEFAULT: Use the configured merge engine (rowMerger)
+ // - OVERWRITE: Bypass merge engine, use pre-created
overwriteRowMerger
+ // to directly replace values (for undo recovery
scenarios)
+ // We only support ADD COLUMN, so targetColumns is fine to
be used directly.
RowMerger currentMerger =
- rowMerger.configureTargetColumns(
- targetColumns, latestSchemaId,
latestSchema);
+ (mergeMode == MergeMode.OVERWRITE)
+ ?
overwriteRowMerger.configureTargetColumns(
+ targetColumns, latestSchemaId,
latestSchema)
+ : rowMerger.configureTargetColumns(
+ targetColumns, latestSchemaId,
latestSchema);
AutoIncrementUpdater currentAutoIncrementUpdater =
autoIncrementManager.getUpdaterForSchema(kvFormat,
latestSchemaId);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 041bcb910..b884ed88b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -45,6 +45,7 @@ import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.SequenceIDCounter;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
@@ -946,7 +947,10 @@ public final class Replica {
}
public LogAppendInfo putRecordsToLeader(
- KvRecordBatch kvRecords, @Nullable int[] targetColumns, int
requiredAcks)
+ KvRecordBatch kvRecords,
+ @Nullable int[] targetColumns,
+ MergeMode mergeMode,
+ int requiredAcks)
throws Exception {
return inReadLock(
leaderIsrUpdateLock,
@@ -964,7 +968,7 @@ public final class Replica {
kv, "KvTablet for the replica to put kv records
shouldn't be null.");
LogAppendInfo logAppendInfo;
try {
- logAppendInfo = kv.putAsLeader(kvRecords,
targetColumns);
+ logAppendInfo = kv.putAsLeader(kvRecords,
targetColumns, mergeMode);
} catch (IOException e) {
LOG.error("Error while putting records to {}",
tableBucket, e);
fatalErrorHandler.onFatalError(e);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index eeb7a38d2..a9bfde8e2 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -60,6 +60,7 @@ import
org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.entity.LakeBucketOffset;
@@ -559,6 +560,7 @@ public class ReplicaManager {
int requiredAcks,
Map<TableBucket, KvRecordBatch> entriesPerBucket,
@Nullable int[] targetColumns,
+ MergeMode mergeMode,
short apiVersion,
Consumer<List<PutKvResultForBucket>> responseCallback) {
if (isRequiredAcksInvalid(requiredAcks)) {
@@ -567,7 +569,7 @@ public class ReplicaManager {
long startTime = System.currentTimeMillis();
Map<TableBucket, PutKvResultForBucket> kvPutResult =
- putToLocalKv(entriesPerBucket, targetColumns, requiredAcks,
apiVersion);
+ putToLocalKv(entriesPerBucket, targetColumns, mergeMode,
requiredAcks, apiVersion);
LOG.debug(
"Put records to local kv storage and wait generate cdc log in
{} ms",
System.currentTimeMillis() - startTime);
@@ -1047,6 +1049,7 @@ public class ReplicaManager {
private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
Map<TableBucket, KvRecordBatch> entriesPerBucket,
@Nullable int[] targetColumns,
+ MergeMode mergeMode,
int requiredAcks,
short apiVersion) {
Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new
HashMap<>();
@@ -1060,7 +1063,8 @@ public class ReplicaManager {
tableMetrics = replica.tableMetrics();
tableMetrics.totalPutKvRequests().inc();
LogAppendInfo appendInfo =
- replica.putRecordsToLeader(entry.getValue(),
targetColumns, requiredAcks);
+ replica.putRecordsToLeader(
+ entry.getValue(), targetColumns, mergeMode,
requiredAcks);
LOG.trace(
"Written to local kv for {}, and the cdc log beginning
at offset {} and ending at offset {}",
tb,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index c06c382e6..145cd1f0f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -62,6 +62,7 @@ import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.security.acl.OperationType;
import org.apache.fluss.security.acl.Resource;
import org.apache.fluss.server.DynamicConfigManager;
@@ -224,12 +225,18 @@ public final class TabletService extends RpcServiceBase
implements TabletServerG
authorizeTable(WRITE, request.getTableId());
Map<TableBucket, KvRecordBatch> putKvData = getPutKvData(request);
+ // Get mergeMode from request, default to DEFAULT if not set
+ MergeMode mergeMode =
+ request.hasAggMode()
+ ? MergeMode.fromValue(request.getAggMode())
+ : MergeMode.DEFAULT;
CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
replicaManager.putRecordsToKv(
request.getTimeoutMs(),
request.getAcks(),
putKvData,
getTargetColumns(request),
+ mergeMode,
currentSession().getApiVersion(),
bucketResponse ->
response.complete(makePutKvResponse(bucketResponse)));
return response;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
new file mode 100644
index 000000000..ce1842309
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.KvRecord;
+import org.apache.fluss.record.KvRecordBatch;
+import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecords;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.rpc.protocol.MergeMode;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
+import org.apache.fluss.server.kv.rowmerger.RowMerger;
+import org.apache.fluss.server.log.FetchIsolation;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
+
+/**
+ * Tests for {@link KvTablet} with {@link MergeMode} support.
+ *
+ * <p>These tests verify that OVERWRITE mode correctly bypasses the merge
engine and directly
+ * replaces values, which is essential for undo recovery scenarios.
+ */
+class KvTabletMergeModeTest {
+
+ private static final short SCHEMA_ID = 1;
+
+ private final Configuration conf = new Configuration();
+ private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory =
+ KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID);
+
+ private @TempDir File tempLogDir;
+ private @TempDir File tmpKvDir;
+
+ private TestingSchemaGetter schemaGetter;
+ private LogTablet logTablet;
+ private KvTablet kvTablet;
+
+ // Schema with aggregation functions for testing
+ private static final Schema AGG_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("count", DataTypes.BIGINT(), AggFunctions.SUM())
+ .column("max_val", DataTypes.INT(), AggFunctions.MAX())
+ .column("name", DataTypes.STRING(),
AggFunctions.LAST_VALUE())
+ .primaryKey("id")
+ .build();
+
+ private static final RowType AGG_ROW_TYPE = AGG_SCHEMA.getRowType();
+
+ private final KvRecordTestUtils.KvRecordFactory kvRecordFactory =
+ KvRecordTestUtils.KvRecordFactory.of(AGG_ROW_TYPE);
+
+ @BeforeEach
+ void setUp() throws Exception {
+ Map<String, String> config = new HashMap<>();
+ config.put("table.merge-engine", "aggregation");
+
+ TablePath tablePath = TablePath.of("testDb", "test_merge_mode");
+ PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath);
+ schemaGetter = new TestingSchemaGetter(new SchemaInfo(AGG_SCHEMA,
SCHEMA_ID));
+
+ File logTabletDir =
+ LogTestUtils.makeRandomLogTabletDir(
+ tempLogDir,
+ physicalTablePath.getDatabaseName(),
+ 0L,
+ physicalTablePath.getTableName());
+ logTablet =
+ LogTablet.create(
+ physicalTablePath,
+ logTabletDir,
+ conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ 0,
+ new FlussScheduler(1),
+ LogFormat.ARROW,
+ 1,
+ true,
+ SystemClock.getInstance(),
+ true);
+
+ TableBucket tableBucket = logTablet.getTableBucket();
+ TableConfig tableConf = new TableConfig(Configuration.fromMap(config));
+ RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED,
schemaGetter);
+ AutoIncrementManager autoIncrementManager =
+ new AutoIncrementManager(
+ schemaGetter,
+ tablePath,
+ new TableConfig(new Configuration()),
+ new TestingSequenceGeneratorFactory());
+
+ kvTablet =
+ KvTablet.create(
+ physicalTablePath,
+ tableBucket,
+ logTablet,
+ tmpKvDir,
+ conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
+ new RootAllocator(Long.MAX_VALUE),
+ new TestingMemorySegmentPool(10 * 1024),
+ KvFormat.COMPACTED,
+ rowMerger,
+ DEFAULT_COMPRESSION,
+ schemaGetter,
+ tableConf.getChangelogImage(),
+ KvManager.getDefaultRateLimiter(),
+ autoIncrementManager);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (kvTablet != null) {
+ kvTablet.close();
+ }
+ if (logTablet != null) {
+ logTablet.close();
+ }
+ }
+
+ // ==================== DEFAULT Mode Tests ====================
+
+ @Test
+ void testDefaultModeAppliesMergeEngine() throws Exception {
+ // Insert initial record: id=1, count=10, max_val=100, name="Alice"
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Update with DEFAULT mode: count should be summed, max_val should
take max
+ // id=1, count=5, max_val=150, name="Bob"
+ KvRecordBatch batch2 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 5L, 150,
"Bob"}));
+ kvTablet.putAsLeader(batch2, null, MergeMode.DEFAULT);
+
+ // Verify CDC log shows aggregated values
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {1, 10L, 100, "Alice"}, // before
+ new Object[] {
+ 1, 15L, 150, "Bob"
+ } // after: count=10+5, max=max(100,150)
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ // ==================== OVERWRITE Mode Tests ====================
+
+ @Test
+ void testOverwriteModeBypassesMergeEngine() throws Exception {
+ // Insert initial record with DEFAULT mode
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Update with OVERWRITE mode: values should be directly replaced, not
aggregated
+ // id=1, count=5, max_val=50, name="Bob"
+ KvRecordBatch batch2 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord("k1".getBytes(), new Object[]
{1, 5L, 50, "Bob"}));
+ kvTablet.putAsLeader(batch2, null, MergeMode.OVERWRITE);
+
+ // Verify CDC log shows directly replaced values (not aggregated)
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {1, 10L, 100, "Alice"}, // before
+ new Object[] {
+ 1, 5L, 50, "Bob"
+ } // after: directly replaced, NOT aggregated
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+
+ // Key assertion: count=5 (not 15), max_val=50 (not 100)
+ // This proves OVERWRITE bypassed the merge engine
+ }
+
+ @Test
+ void testOverwriteModeForUndoRecoveryScenario() throws Exception {
+ // Simulate a typical undo recovery scenario:
+ // 1. Initial state: id=1, count=100, max_val=500, name="Original"
+ // 2. After some operations: id=1, count=150, max_val=600,
name="Updated"
+ // 3. Undo recovery needs to restore to: id=1, count=100, max_val=500,
name="Original"
+
+ // Step 1: Insert initial record
+ KvRecordBatch initialBatch =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 100L, 500,
"Original"}));
+ kvTablet.putAsLeader(initialBatch, null, MergeMode.DEFAULT);
+
+ // Step 2: Simulate some aggregation operations
+ KvRecordBatch updateBatch =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 50L, 600,
"Updated"}));
+ kvTablet.putAsLeader(updateBatch, null, MergeMode.DEFAULT);
+
+ long beforeUndoOffset = logTablet.localLogEndOffset();
+
+ // Step 3: Undo recovery - restore to original state using OVERWRITE
mode
+ KvRecordBatch undoBatch =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 100L, 500,
"Original"}));
+ kvTablet.putAsLeader(undoBatch, null, MergeMode.OVERWRITE);
+
+ // Verify the undo operation produced correct CDC log
+ LogRecords actualLogRecords = readLogRecords(beforeUndoOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ beforeUndoOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ // Before: the aggregated state (count=150,
max_val=600)
+ new Object[] {1, 150L, 600, "Updated"},
+ // After: restored to original (count=100,
max_val=500)
+ new Object[] {1, 100L, 500, "Original"}));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testOverwriteModeWithNewKey() throws Exception {
+ // OVERWRITE mode with a new key should behave like INSERT
+ KvRecordBatch batch =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch, null, MergeMode.OVERWRITE);
+
+ LogRecords actualLogRecords = readLogRecords(0);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ 0L,
+ Collections.singletonList(ChangeType.INSERT),
+ Collections.singletonList(new Object[] {1, 10L, 100,
"Alice"}));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testOverwriteModeWithDelete() throws Exception {
+ // Insert initial record
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Delete with OVERWRITE mode
+ KvRecordBatch deleteBatch =
+
kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("k1".getBytes(), null));
+ kvTablet.putAsLeader(deleteBatch, null, MergeMode.OVERWRITE);
+
+ // Verify DELETE is produced
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Collections.singletonList(ChangeType.DELETE),
+ Collections.singletonList(new Object[] {1, 10L, 100,
"Alice"}));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testMixedMergeModeOperations() throws Exception {
+ // Test interleaved DEFAULT and OVERWRITE operations
+
+ // 1. Insert with DEFAULT
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"v1"}));
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ // 2. Update with DEFAULT (should aggregate)
+ KvRecordBatch batch2 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord("k1".getBytes(), new Object[]
{1, 5L, 150, "v2"}));
+ kvTablet.putAsLeader(batch2, null, MergeMode.DEFAULT);
+
+ long afterDefaultOffset = logTablet.localLogEndOffset();
+
+ // 3. Overwrite with specific value
+ KvRecordBatch batch3 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord("k1".getBytes(), new Object[]
{1, 20L, 80, "v3"}));
+ kvTablet.putAsLeader(batch3, null, MergeMode.OVERWRITE);
+
+ long afterOverwriteOffset = logTablet.localLogEndOffset();
+
+ // 4. Continue with DEFAULT (should aggregate from overwritten value)
+ KvRecordBatch batch4 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 200,
"v4"}));
+ kvTablet.putAsLeader(batch4, null, MergeMode.DEFAULT);
+
+ // Verify the final aggregation is based on overwritten value
+ LogRecords actualLogRecords = readLogRecords(afterOverwriteOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ afterOverwriteOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ // Before: overwritten value
+ new Object[] {1, 20L, 80, "v3"},
+ // After: aggregated from overwritten value
+ // count=20+10=30, max_val=max(80,200)=200
+ new Object[] {1, 30L, 200, "v4"}));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testOverwriteModeWithPartialUpdate() throws Exception {
+ // Insert initial record
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Partial update with OVERWRITE mode (only update id and count
columns)
+ int[] targetColumns = new int[] {0, 1}; // id and count
+ KvRecordBatch batch2 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 5L, null,
null}));
+ kvTablet.putAsLeader(batch2, targetColumns, MergeMode.OVERWRITE);
+
+ // Verify partial update with OVERWRITE: count should be replaced (not
aggregated)
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {1, 10L, 100, "Alice"}, // before
+ new Object[] {
+ 1, 5L, 100, "Alice"
+ } // after: count replaced, others unchanged
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ @Test
+ void testOverwriteModeWithMultipleKeys() throws Exception {
+ // Insert multiple records
+ List<KvRecord> initialRecords =
+ Arrays.asList(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}),
+ kvRecordFactory.ofRecord(
+ "k2".getBytes(), new Object[] {2, 20L, 200,
"Bob"}));
+ KvRecordBatch batch1 = kvRecordBatchFactory.ofRecords(initialRecords);
+ kvTablet.putAsLeader(batch1, null, MergeMode.DEFAULT);
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Overwrite multiple keys in single batch
+ List<KvRecord> overwriteRecords =
+ Arrays.asList(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 5L, 50,
"Alice2"}),
+ kvRecordFactory.ofRecord(
+ "k2".getBytes(), new Object[] {2, 8L, 80,
"Bob2"}));
+ KvRecordBatch batch2 =
kvRecordBatchFactory.ofRecords(overwriteRecords);
+ kvTablet.putAsLeader(batch2, null, MergeMode.OVERWRITE);
+
+ // Verify both keys are overwritten (not aggregated)
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(
+ ChangeType.UPDATE_BEFORE,
+ ChangeType.UPDATE_AFTER,
+ ChangeType.UPDATE_BEFORE,
+ ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {1, 10L, 100, "Alice"},
+ new Object[] {1, 5L, 50, "Alice2"}, // k1
overwritten
+ new Object[] {2, 20L, 200, "Bob"},
+ new Object[] {2, 8L, 80, "Bob2"} // k2
overwritten
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ // ==================== Default MergeMode Tests ====================
+
+ @Test
+ void testDefaultMergeModeIsDefault() throws Exception {
+ // Insert initial record using default (no mergeMode parameter)
+ KvRecordBatch batch1 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 10L, 100,
"Alice"}));
+ kvTablet.putAsLeader(batch1, null); // Using overload without mergeMode
+
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Update using default (should aggregate)
+ KvRecordBatch batch2 =
+ kvRecordBatchFactory.ofRecords(
+ kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, 5L, 150,
"Bob"}));
+ kvTablet.putAsLeader(batch2, null); // Using overload without mergeMode
+
+ // Verify aggregation happened (proving default is DEFAULT)
+ LogRecords actualLogRecords = readLogRecords(endOffset);
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(ChangeType.UPDATE_BEFORE,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(
+ new Object[] {1, 10L, 100, "Alice"},
+ new Object[] {
+ 1, 15L, 150, "Bob"
+ } // count=10+5=15, max=max(100,150)=150
+ ));
+
+ assertThatLogRecords(actualLogRecords)
+ .withSchema(AGG_ROW_TYPE)
+ .assertCheckSum(true)
+ .isEqualTo(expectedLogs);
+ }
+
+ // ==================== Helper Methods ====================
+
+ private LogRecords readLogRecords(long startOffset) throws Exception {
+ return logTablet
+ .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END,
false, null)
+ .getRecords();
+ }
+
+ private MemoryLogRecords logRecords(
+ long baseOffset, List<ChangeType> changeTypes, List<Object[]>
rows) throws Exception {
+ return createBasicMemoryLogRecords(
+ AGG_ROW_TYPE,
+ SCHEMA_ID,
+ baseOffset,
+ -1L,
+ CURRENT_LOG_MAGIC_VALUE,
+ NO_WRITER_ID,
+ NO_BATCH_SEQUENCE,
+ changeTypes,
+ rows,
+ LogFormat.ARROW,
+ DEFAULT_COMPRESSION);
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 35afe7a86..fc8a4a054 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
import org.apache.fluss.rpc.entity.PutKvResultForBucket;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
@@ -481,6 +482,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -494,6 +496,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
Collections.singletonMap(
tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
(result) -> {
// do nothing.
@@ -509,6 +512,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(unknownTb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get())
@@ -544,6 +548,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
genKvRecordBatchWithWriterId(
data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L,
0)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5));
@@ -589,6 +594,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
genKvRecordBatchWithWriterId(
data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L,
3)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
PutKvResultForBucket putKvResultForBucket = future.get().get(0);
@@ -630,6 +636,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
genKvRecordBatchWithWriterId(
data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L,
1)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -679,6 +686,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb, genKvRecordBatch(deleteList)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb,
i + 1));
@@ -691,6 +699,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb,
18));
@@ -752,6 +761,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future1::complete);
assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb,
8));
@@ -824,6 +834,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb, genKvRecordBatch(keyType,
rowType, data1)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 4));
@@ -904,6 +915,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future1::complete);
assertThat(future1.get()).containsOnly(new PutKvResultForBucket(tb,
8));
@@ -1145,6 +1157,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
-1,
Collections.singletonMap(tb,
genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8));
@@ -1314,6 +1327,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
Collections.singletonList(
Tuple2.of(key, value)))),
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
future::complete);
} catch (Exception e) {
@@ -1395,6 +1409,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
-1,
entriesPerBucket,
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
writeResultForBuckets -> {
// do nothing
@@ -1434,6 +1449,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
-1,
entriesPerBucket,
null,
+ MergeMode.DEFAULT,
PUT_KV_VERSION,
writeResultForBuckets -> {
// do nothing
@@ -1847,7 +1863,13 @@ class ReplicaManagerTest extends ReplicaTestBase {
CompletableFuture<List<PutKvResultForBucket>> writeFuture = new
CompletableFuture<>();
// put kv record batch for every bucket
replicaManager.putRecordsToKv(
- 300000, -1, kvRecordBatchPerBucket, null, PUT_KV_VERSION,
writeFuture::complete);
+ 300000,
+ -1,
+ kvRecordBatchPerBucket,
+ null,
+ MergeMode.DEFAULT,
+ PUT_KV_VERSION,
+ writeFuture::complete);
// wait the write ack
writeFuture.get();
}
@@ -1955,6 +1977,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
1,
Collections.singletonMap(tb, genKvRecordBatch(keyType,
rowType, data)),
null,
+ MergeMode.DEFAULT,
oldClientVersion,
putFuture::complete);
PutKvResultForBucket putResult = putFuture.get().get(0);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 8bdd46db6..04331fb6a 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -32,6 +32,7 @@ import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.record.ProjectionPushdownCache;
+import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
import org.apache.fluss.server.kv.KvTablet;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -868,7 +869,9 @@ final class ReplicaTest extends ReplicaTestBase {
private LogAppendInfo putRecordsToLeader(
Replica replica, KvRecordBatch kvRecords, int[] targetColumns)
throws Exception {
- LogAppendInfo logAppendInfo = replica.putRecordsToLeader(kvRecords,
targetColumns, 0);
+ // Use DEFAULT mode as default for tests
+ LogAppendInfo logAppendInfo =
+ replica.putRecordsToLeader(kvRecords, targetColumns,
MergeMode.DEFAULT, 0);
KvTablet kvTablet = checkNotNull(replica.getKvTablet());
// flush to make data visible
kvTablet.flush(replica.getLocalLogEndOffset(),
NOPErrorHandler.INSTANCE);