This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e5d10ac61 [kv] Support "table.changelog.image" configuration and WAL 
image mode for Primary Key Tables (#2105)
e5d10ac61 is described below

commit e5d10ac61ef4b211bbce1901ece61e172942e7c7
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 20 00:23:26 2025 +0800

    [kv] Support "table.changelog.image" configuration and WAL image mode for 
Primary Key Tables (#2105)
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../org/apache/fluss/config/ConfigOptions.java     |  16 ++
 .../java/org/apache/fluss/config/TableConfig.java  |   9 +
 .../org/apache/fluss/metadata/ChangelogImage.java  |  59 ++++
 .../fluss/flink/source/FlinkTableSource.java       |  25 +-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |  70 +++++
 .../java/org/apache/fluss/server/kv/KvManager.java |  15 +-
 .../java/org/apache/fluss/server/kv/KvTablet.java  | 298 ++++++++++++++-------
 .../org/apache/fluss/server/kv/KvTabletTest.java   | 140 +++++++++-
 website/docs/engine-flink/options.md               |   1 +
 9 files changed, 518 insertions(+), 115 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index b33ffc983..7eb203999 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -20,6 +20,7 @@ package org.apache.fluss.config;
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.compression.ArrowCompressionType;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
@@ -1446,6 +1447,21 @@ public class ConfigOptions {
                                     + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
                                     + "Currently a table can have only one 
auto-increment column.");
 
+    public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
+            key("table.changelog.image")
+                    .enumType(ChangelogImage.class)
+                    .defaultValue(ChangelogImage.FULL)
+                    .withDescription(
+                            "Defines the changelog image mode for the primary 
key table. "
+                                    + "This configuration is inspired by 
similar settings in database systems like MySQL's binlog_row_image and 
PostgreSQL's replica identity. "
+                                    + "The supported modes are `FULL` 
(default) and `WAL`. "
+                                    + "The `FULL` mode produces both 
UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing 
complete information about updates and allowing tracking of previous values. "
+                                    + "The `WAL` mode does not produce 
UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed) 
records are emitted. "
+                                    + "When WAL mode is enabled with default 
merge engine (no merge engine configured) and full row updates (not partial 
update), an optimization is applied to skip looking up old values, "
+                                    + "and in this case INSERT operations are 
converted to UPDATE_AFTER events. "
+                                    + "This mode reduces storage and 
transmission costs but loses the ability to track previous values. "
+                                    + "This option only affects primary key 
tables.");
+
     // ------------------------------------------------------------------------
     //  ConfigOptions for Kv
     // ------------------------------------------------------------------------
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index a1422f460..fc7966ab0 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -19,6 +19,7 @@ package org.apache.fluss.config;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
@@ -117,6 +118,14 @@ public class TableConfig {
         return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
     }
 
+    /**
+     * Gets the changelog image mode of the table. The changelog image mode 
defines what information
+     * is included in the changelog for update operations.
+     */
+    public ChangelogImage getChangelogImage() {
+        return config.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+    }
+
     /** Gets the Arrow compression type and compression level of the table. */
     public ArrowCompressionInfo getArrowCompressionInfo() {
         return ArrowCompressionInfo.fromConf(config);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
new file mode 100644
index 000000000..02f7a5750
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/**
+ * The changelog image mode for the primary key table.
+ *
+ * <p>This enum defines what information is included in the changelog for 
update operations. It is
+ * inspired by similar configurations in database systems like MySQL's 
binlog_row_image and
+ * PostgreSQL's replica identity.
+ *
+ * @since 0.9
+ */
+public enum ChangelogImage {
+
+    /**
+     * Full changelog with both UPDATE_BEFORE and UPDATE_AFTER records. This 
is the default behavior
+     * that captures complete information about updates, allowing tracking of 
previous values.
+     */
+    FULL,
+
+    /**
+     * WAL mode does not produce UPDATE_BEFORE records. Only INSERT, 
UPDATE_AFTER (and DELETE if
+     * allowed) records are emitted. When WAL mode is enabled with default 
merge engine (no merge
+     * engine configured) and full row updates (not partial update), an 
optimization is applied to
+     * skip looking up old values, and in this case INSERT operations are 
converted to UPDATE_AFTER
+     * events, similar to database WAL (Write-Ahead Log) behavior. This mode 
reduces storage and
+     * transmission costs but loses the ability to track previous values.
+     */
+    WAL;
+
+    /** Creates a {@link ChangelogImage} from the given string. */
+    public static ChangelogImage fromString(String image) {
+        switch (image.toUpperCase()) {
+            case "FULL":
+                return FULL;
+            case "WAL":
+                return WAL;
+
+            default:
+                throw new IllegalArgumentException("Unsupported changelog 
image: " + image);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index ef4b63812..2394921f1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -31,6 +31,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
 import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
@@ -206,10 +207,32 @@ public class FlinkTableSource
                 if (mergeEngineType == MergeEngineType.FIRST_ROW) {
                     return ChangelogMode.insertOnly();
                 } else {
-                    // Check delete behavior configuration
                     Configuration tableConf = 
Configuration.fromMap(tableOptions);
                     DeleteBehavior deleteBehavior =
                             tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+                    ChangelogImage changelogImage =
+                            tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+                    if (changelogImage == ChangelogImage.WAL) {
+                        // When using WAL mode, produce INSERT and 
UPDATE_AFTER (and DELETE if
+                        // allowed), without UPDATE_BEFORE. Note: with default 
merge engine and full
+                        // row updates, an optimization converts INSERT to 
UPDATE_AFTER.
+                        if (deleteBehavior == DeleteBehavior.ALLOW) {
+                            // DELETE is still produced when delete behavior 
is allowed
+                            return ChangelogMode.newBuilder()
+                                    .addContainedKind(RowKind.INSERT)
+                                    .addContainedKind(RowKind.UPDATE_AFTER)
+                                    .addContainedKind(RowKind.DELETE)
+                                    .build();
+                        } else {
+                            // No DELETE when delete operations are ignored or 
disabled
+                            return ChangelogMode.newBuilder()
+                                    .addContainedKind(RowKind.INSERT)
+                                    .addContainedKind(RowKind.UPDATE_AFTER)
+                                    .build();
+                        }
+                    }
+
+                    // Using FULL mode, produce full changelog
                     if (deleteBehavior == DeleteBehavior.ALLOW) {
                         return ChangelogMode.all();
                     } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c8aeaaae8..c373dd998 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1384,4 +1384,74 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
             assertResultsIgnoreOrder(rowIter, expectedRows, true);
         }
     }
+
+    @Test
+    void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
+        String tableName = "wal_mode_pk_table";
+        // Create a table with WAL mode and default merge engine
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " id int not null,"
+                                + " category string,"
+                                + " amount bigint,"
+                                + " primary key (id) not enforced"
+                                + ") with ('table.changelog.image' = 'wal')",
+                        tableName));
+
+        // Insert initial data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES "
+                                        + "(1, 'A', 100), "
+                                        + "(2, 'B', 200), "
+                                        + "(3, 'A', 150), "
+                                        + "(4, 'B', 250)",
+                                tableName))
+                .await();
+
+        // Use batch mode to update and delete records
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE 
id = 1").await();
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE 
id = 3").await();
+        tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 
4").await();
+
+        // Do aggregation on the table and verify ChangelogNormalize node is 
generated
+        String aggQuery =
+                String.format(
+                        "SELECT category, SUM(amount) as total_amount FROM %s 
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
+                        tableName);
+
+        // Explain the aggregation query to check for ChangelogNormalize
+        String aggPlan = tEnv.explainSql(aggQuery);
+        // ChangelogNormalize should be present to normalize the changelog for 
aggregation
+        // In Flink, when the source produces changelog with primary key 
semantics (I, UA, D),
+        // a ChangelogNormalize operator is inserted before aggregation
+        assertThat(aggPlan).contains("ChangelogNormalize");
+
+        // Execute the aggregation and verify the result
+        CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
+
+        // Expected aggregation results:
+        // Category A: 120 (id=1) + 180 (id=3) = 300
+        // Category B: 200 (id=2) = 200 (id=4 was deleted)
+        List<String> expectedAggResults =
+                Arrays.asList(
+                        "+I[A, 100]",
+                        "-U[A, 100]",
+                        "+U[A, 250]",
+                        "-U[A, 250]",
+                        "+U[A, 150]",
+                        "-U[A, 150]",
+                        "+U[A, 270]",
+                        "-U[A, 270]",
+                        "+U[A, 120]",
+                        "-U[A, 120]",
+                        "+U[A, 300]",
+                        "+I[B, 250]",
+                        "-D[B, 250]",
+                        "+I[B, 200]");
+
+        // Collect results with timeout
+        assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index d1e24f6bc..b5318ef60 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -185,7 +185,8 @@ public final class KvManager extends TabletManagerBase {
                                     kvFormat,
                                     merger,
                                     arrowCompressionInfo,
-                                    schemaGetter);
+                                    schemaGetter,
+                                    tableConfig.getChangelogImage());
                     currentKvs.put(tableBucket, tablet);
 
                     LOG.info(
@@ -277,9 +278,8 @@ public final class KvManager extends TabletManagerBase {
         TablePath tablePath = physicalTablePath.getTablePath();
         TableInfo tableInfo = getTableInfo(zkClient, tablePath);
 
-        RowMerger rowMerger =
-                RowMerger.create(
-                        tableInfo.getTableConfig(), 
tableInfo.getTableConfig().getKvFormat());
+        TableConfig tableConfig = tableInfo.getTableConfig();
+        RowMerger rowMerger = RowMerger.create(tableConfig, 
tableConfig.getKvFormat());
         KvTablet kvTablet =
                 KvTablet.create(
                         physicalTablePath,
@@ -290,10 +290,11 @@ public final class KvManager extends TabletManagerBase {
                         serverMetricGroup,
                         arrowBufferAllocator,
                         memorySegmentPool,
-                        tableInfo.getTableConfig().getKvFormat(),
+                        tableConfig.getKvFormat(),
                         rowMerger,
-                        tableInfo.getTableConfig().getArrowCompressionInfo(),
-                        schemaGetter);
+                        tableConfig.getArrowCompressionInfo(),
+                        schemaGetter,
+                        tableConfig.getChangelogImage());
         if (this.currentKvs.containsKey(tableBucket)) {
             throw new IllegalStateException(
                     String.format(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index e6542d8f4..1f1fb5f0f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -25,6 +25,7 @@ import org.apache.fluss.exception.DeletionDisabledException;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.exception.SchemaNotExistException;
 import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
@@ -49,6 +50,7 @@ import 
org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
 import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
+import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
@@ -114,6 +116,9 @@ public final class KvTablet {
 
     private final SchemaGetter schemaGetter;
 
+    // the changelog image mode for this tablet
+    private final ChangelogImage changelogImage;
+
     /**
      * The kv data in pre-write buffer whose log offset is less than the 
flushedLogOffset has been
      * flushed into kv.
@@ -137,7 +142,8 @@ public final class KvTablet {
             KvFormat kvFormat,
             RowMerger rowMerger,
             ArrowCompressionInfo arrowCompressionInfo,
-            SchemaGetter schemaGetter) {
+            SchemaGetter schemaGetter,
+            ChangelogImage changelogImage) {
         this.physicalPath = physicalPath;
         this.tableBucket = tableBucket;
         this.logTablet = logTablet;
@@ -152,6 +158,7 @@ public final class KvTablet {
         this.rowMerger = rowMerger;
         this.arrowCompressionInfo = arrowCompressionInfo;
         this.schemaGetter = schemaGetter;
+        this.changelogImage = changelogImage;
     }
 
     public static KvTablet create(
@@ -164,7 +171,8 @@ public final class KvTablet {
             KvFormat kvFormat,
             RowMerger rowMerger,
             ArrowCompressionInfo arrowCompressionInfo,
-            SchemaGetter schemaGetter)
+            SchemaGetter schemaGetter,
+            ChangelogImage changelogImage)
             throws IOException {
         Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
                 FlussPaths.parseTabletDir(kvTabletDir);
@@ -180,7 +188,8 @@ public final class KvTablet {
                 kvFormat,
                 rowMerger,
                 arrowCompressionInfo,
-                schemaGetter);
+                schemaGetter,
+                changelogImage);
     }
 
     public static KvTablet create(
@@ -195,7 +204,8 @@ public final class KvTablet {
             KvFormat kvFormat,
             RowMerger rowMerger,
             ArrowCompressionInfo arrowCompressionInfo,
-            SchemaGetter schemaGetter)
+            SchemaGetter schemaGetter,
+            ChangelogImage changelogImage)
             throws IOException {
         RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
         return new KvTablet(
@@ -212,7 +222,8 @@ public final class KvTablet {
                 kvFormat,
                 rowMerger,
                 arrowCompressionInfo,
-                schemaGetter);
+                schemaGetter,
+                changelogImage);
     }
 
     private static RocksDBKv buildRocksDBKv(Configuration configuration, File 
kvDir)
@@ -282,20 +293,13 @@ public final class KvTablet {
                     SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo();
                     Schema latestSchema = schemaInfo.getSchema();
                     short latestSchemaId = (short) schemaInfo.getSchemaId();
-                    short schemaIdOfNewData = kvRecords.schemaId();
-                    if (schemaIdOfNewData > latestSchemaId || 
schemaIdOfNewData < 0) {
-                        // TODO: we may need to support retriable exception 
here
-                        throw new SchemaNotExistException(
-                                "Invalid schema id: "
-                                        + schemaIdOfNewData
-                                        + ", latest schema id: "
-                                        + latestSchemaId);
-                    }
+                    validateSchemaId(kvRecords.schemaId(), latestSchemaId);
 
                     // we only support ADD COLUMN, so targetColumns is fine to 
be used directly
                     RowMerger currentMerger =
                             rowMerger.configureTargetColumns(
                                     targetColumns, latestSchemaId, 
latestSchema);
+
                     RowType latestRowType = latestSchema.getRowType();
                     WalBuilder walBuilder = createWalBuilder(latestSchemaId, 
latestRowType);
                     walBuilder.setWriterState(kvRecords.writerId(), 
kvRecords.batchSequence());
@@ -305,96 +309,15 @@ public final class KvTablet {
                     PaddingRow latestSchemaRow = new 
PaddingRow(latestRowType.getFieldCount());
                     // get offset to track the offset corresponded to the kv 
record
                     long logEndOffsetOfPrevBatch = 
logTablet.localLogEndOffset();
+
                     try {
-                        long logOffset = logEndOffsetOfPrevBatch;
-
-                        // TODO: reuse the read context and decoder
-                        KvRecordBatch.ReadContext readContext =
-                                
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
-                        ValueDecoder valueDecoder = new 
ValueDecoder(schemaGetter, kvFormat);
-                        for (KvRecord kvRecord : 
kvRecords.records(readContext)) {
-                            byte[] keyBytes = 
BytesUtils.toArray(kvRecord.getKey());
-                            KvPreWriteBuffer.Key key = 
KvPreWriteBuffer.Key.of(keyBytes);
-                            BinaryRow row = kvRecord.getRow();
-                            BinaryValue currentValue =
-                                    row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
-                            if (currentValue == null) {
-                                DeleteBehavior deleteBehavior = 
currentMerger.deleteBehavior();
-                                if (deleteBehavior == DeleteBehavior.IGNORE) {
-                                    // skip delete rows if the merger doesn't 
support yet
-                                    continue;
-                                } else if (deleteBehavior == 
DeleteBehavior.DISABLE) {
-                                    throw new DeletionDisabledException(
-                                            "Delete operations are disabled 
for this table. "
-                                                    + "The 
table.delete.behavior is set to 'disable'.");
-                                }
-                                // it's for deletion
-                                byte[] oldValueBytes = getFromBufferOrKv(key);
-
-                                if (oldValueBytes == null) {
-                                    // there might be large amount of such 
deletion, so we don't log
-                                    LOG.debug(
-                                            "The specific key can't be found 
in kv tablet although the kv record is for deletion, "
-                                                    + "ignore it directly as 
it doesn't exist in the kv tablet yet.");
-                                } else {
-                                    BinaryValue oldValue = 
valueDecoder.decodeValue(oldValueBytes);
-                                    BinaryValue newValue = 
currentMerger.delete(oldValue);
-                                    // if newRow is null, it means the row 
should be deleted
-                                    if (newValue == null) {
-                                        walBuilder.append(
-                                                ChangeType.DELETE,
-                                                
latestSchemaRow.replaceRow(oldValue.row));
-                                        kvPreWriteBuffer.delete(key, 
logOffset++);
-                                    } else {
-                                        // otherwise, it's a partial update, 
should produce -U,+U
-                                        walBuilder.append(
-                                                ChangeType.UPDATE_BEFORE,
-                                                
latestSchemaRow.replaceRow(oldValue.row));
-                                        walBuilder.append(
-                                                ChangeType.UPDATE_AFTER,
-                                                
latestSchemaRow.replaceRow(newValue.row));
-                                        kvPreWriteBuffer.put(
-                                                key, newValue.encodeValue(), 
logOffset + 1);
-                                        logOffset += 2;
-                                    }
-                                }
-                            } else {
-                                // upsert operation
-                                byte[] oldValueBytes = getFromBufferOrKv(key);
-                                // it's update
-                                if (oldValueBytes != null) {
-                                    BinaryValue oldValue = 
valueDecoder.decodeValue(oldValueBytes);
-                                    BinaryValue newValue =
-                                            currentMerger.merge(oldValue, 
currentValue);
-                                    if (newValue == oldValue) {
-                                        // newValue is the same to oldValue, 
means nothing
-                                        // happens (no update/delete), and 
input should be ignored
-                                        continue;
-                                    }
-
-                                    walBuilder.append(
-                                            ChangeType.UPDATE_BEFORE,
-                                            
latestSchemaRow.replaceRow(oldValue.row));
-                                    walBuilder.append(
-                                            ChangeType.UPDATE_AFTER,
-                                            
latestSchemaRow.replaceRow(newValue.row));
-                                    // logOffset is for -U, logOffset + 1 is 
for +U, we need to use
-                                    // the log offset for +U
-                                    kvPreWriteBuffer.put(
-                                            key, newValue.encodeValue(), 
logOffset + 1);
-                                    logOffset += 2;
-                                } else {
-                                    // it's insert
-                                    // TODO: we should add guarantees that all 
non-specified columns
-                                    //  of the input row are set to null.
-                                    walBuilder.append(
-                                            ChangeType.INSERT,
-                                            
latestSchemaRow.replaceRow(currentValue.row));
-                                    kvPreWriteBuffer.put(
-                                            key, currentValue.encodeValue(), 
logOffset++);
-                                }
-                            }
-                        }
+                        processKvRecords(
+                                kvRecords,
+                                kvRecords.schemaId(),
+                                currentMerger,
+                                walBuilder,
+                                latestSchemaRow,
+                                logEndOffsetOfPrevBatch);
 
                         // There will be a situation that these batches of 
kvRecordBatch have not
                         // generated any CDC logs, for example, when client 
attempts to delete
@@ -431,6 +354,175 @@ public final class KvTablet {
                 });
     }
 
+    private void validateSchemaId(short schemaIdOfNewData, short 
latestSchemaId) {
+        if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
+            // TODO: we may need to support retriable exception here
+            throw new SchemaNotExistException(
+                    "Invalid schema id: "
+                            + schemaIdOfNewData
+                            + ", latest schema id: "
+                            + latestSchemaId);
+        }
+    }
+
+    private void processKvRecords(
+            KvRecordBatch kvRecords,
+            short schemaIdOfNewData,
+            RowMerger currentMerger,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long startLogOffset)
+            throws Exception {
+        long logOffset = startLogOffset;
+
+        // TODO: reuse the read context and decoder
+        KvRecordBatch.ReadContext readContext =
+                KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
+        ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
+
+        for (KvRecord kvRecord : kvRecords.records(readContext)) {
+            byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
+            KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
+            BinaryRow row = kvRecord.getRow();
+            BinaryValue currentValue = row == null ? null : new 
BinaryValue(schemaIdOfNewData, row);
+
+            if (currentValue == null) {
+                logOffset =
+                        processDeletion(
+                                key,
+                                currentMerger,
+                                valueDecoder,
+                                walBuilder,
+                                latestSchemaRow,
+                                logOffset);
+            } else {
+                logOffset =
+                        processUpsert(
+                                key,
+                                currentValue,
+                                currentMerger,
+                                valueDecoder,
+                                walBuilder,
+                                latestSchemaRow,
+                                logOffset);
+            }
+        }
+    }
+
+    private long processDeletion(
+            KvPreWriteBuffer.Key key,
+            RowMerger currentMerger,
+            ValueDecoder valueDecoder,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long logOffset)
+            throws Exception {
+        DeleteBehavior deleteBehavior = currentMerger.deleteBehavior();
+        if (deleteBehavior == DeleteBehavior.IGNORE) {
+            // skip delete rows if the merger doesn't support yet
+            return logOffset;
+        } else if (deleteBehavior == DeleteBehavior.DISABLE) {
+            throw new DeletionDisabledException(
+                    "Delete operations are disabled for this table. "
+                            + "The table.delete.behavior is set to 
'disable'.");
+        }
+
+        byte[] oldValueBytes = getFromBufferOrKv(key);
+        if (oldValueBytes == null) {
+            LOG.debug(
+                    "The specific key can't be found in kv tablet although the 
kv record is for deletion, "
+                            + "ignore it directly as it doesn't exist in the 
kv tablet yet.");
+            return logOffset;
+        }
+
+        BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+        BinaryValue newValue = currentMerger.delete(oldValue);
+
+        // if newValue is null, it means the row should be deleted
+        if (newValue == null) {
+            return applyDelete(key, oldValue, walBuilder, latestSchemaRow, 
logOffset);
+        } else {
+            return applyUpdate(key, oldValue, newValue, walBuilder, 
latestSchemaRow, logOffset);
+        }
+    }
+
+    private long processUpsert(
+            KvPreWriteBuffer.Key key,
+            BinaryValue currentValue,
+            RowMerger currentMerger,
+            ValueDecoder valueDecoder,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long logOffset)
+            throws Exception {
+        // Optimization: when using WAL mode and merger is DefaultRowMerger 
(full update, not
+        // partial update), we can skip fetching old value for better 
performance since it
+        // always returns new value. In this case, both INSERT and UPDATE will 
produce
+        // UPDATE_AFTER.
+        if (changelogImage == ChangelogImage.WAL && currentMerger instanceof 
DefaultRowMerger) {
+            return applyUpdate(key, null, currentValue, walBuilder, 
latestSchemaRow, logOffset);
+        }
+
+        byte[] oldValueBytes = getFromBufferOrKv(key);
+        if (oldValueBytes == null) {
+            return applyInsert(key, currentValue, walBuilder, latestSchemaRow, 
logOffset);
+        }
+
+        BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+        BinaryValue newValue = currentMerger.merge(oldValue, currentValue);
+
+        if (newValue == oldValue) {
+            // no actual change, skip this record
+            return logOffset;
+        }
+
+        return applyUpdate(key, oldValue, newValue, walBuilder, 
latestSchemaRow, logOffset);
+    }
+
+    private long applyDelete(
+            KvPreWriteBuffer.Key key,
+            BinaryValue oldValue,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long logOffset)
+            throws Exception {
+        walBuilder.append(ChangeType.DELETE, 
latestSchemaRow.replaceRow(oldValue.row));
+        kvPreWriteBuffer.delete(key, logOffset);
+        return logOffset + 1;
+    }
+
+    private long applyInsert(
+            KvPreWriteBuffer.Key key,
+            BinaryValue currentValue,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long logOffset)
+            throws Exception {
+        walBuilder.append(ChangeType.INSERT, 
latestSchemaRow.replaceRow(currentValue.row));
+        kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset);
+        return logOffset + 1;
+    }
+
+    private long applyUpdate(
+            KvPreWriteBuffer.Key key,
+            BinaryValue oldValue,
+            BinaryValue newValue,
+            WalBuilder walBuilder,
+            PaddingRow latestSchemaRow,
+            long logOffset)
+            throws Exception {
+        if (changelogImage == ChangelogImage.WAL) {
+            walBuilder.append(ChangeType.UPDATE_AFTER, 
latestSchemaRow.replaceRow(newValue.row));
+            kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
+            return logOffset + 1;
+        } else {
+            walBuilder.append(ChangeType.UPDATE_BEFORE, 
latestSchemaRow.replaceRow(oldValue.row));
+            walBuilder.append(ChangeType.UPDATE_AFTER, 
latestSchemaRow.replaceRow(newValue.row));
+            kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset + 1);
+            return logOffset + 2;
+        }
+    }
+
     private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws 
Exception {
         switch (logFormat) {
             case INDEXED:
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index f2013a91a..94d41bffe 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -176,9 +176,8 @@ class KvTabletTest {
             SchemaGetter schemaGetter,
             Map<String, String> tableConfig)
             throws Exception {
-        RowMerger rowMerger =
-                RowMerger.create(
-                        new TableConfig(Configuration.fromMap(tableConfig)), 
KvFormat.COMPACTED);
+        TableConfig tableConf = new 
TableConfig(Configuration.fromMap(tableConfig));
+        RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED);
         return KvTablet.create(
                 tablePath,
                 tableBucket,
@@ -191,7 +190,8 @@ class KvTabletTest {
                 KvFormat.COMPACTED,
                 rowMerger,
                 DEFAULT_COMPRESSION,
-                schemaGetter);
+                schemaGetter,
+                tableConf.getChangelogImage());
     }
 
     @Test
@@ -1044,6 +1044,138 @@ class KvTabletTest {
         assertThat(kvTablet.getKvPreWriteBuffer().getMaxLSN()).isEqualTo(9);
     }
 
+    @Test
+    void testWalModeChangelogImageNoUpdateBefore() throws Exception {
+        // WAL mode - no UPDATE_BEFORE. With default merge engine and full row 
update,
+        // optimization converts INSERT to UPDATE_AFTER
+        Map<String, String> config = new HashMap<>();
+        config.put("table.changelog.image", "WAL");
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, config);
+        RowType rowType = DATA1_SCHEMA_PK.getRowType();
+
+        // Insert two records
+        List<KvRecord> kvData1 =
+                Arrays.asList(
+                        kvRecordFactory.ofRecord("k1".getBytes(), new Object[] 
{1, "v11"}),
+                        kvRecordFactory.ofRecord("k2".getBytes(), new Object[] 
{2, "v21"}));
+        KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1);
+        kvTablet.putAsLeader(kvRecordBatch1, null);
+        long endOffset = logTablet.localLogEndOffset();
+
+        // Verify inserts produce +U (optimization in WAL mode with default 
merge engine and full
+        // row update)
+        LogRecords actualLogRecords = readLogRecords();
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        0L,
+                        Arrays.asList(ChangeType.UPDATE_AFTER, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(new Object[] {1, "v11"}, new Object[] 
{2, "v21"}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+
+        // Update the records - should only produce UPDATE_AFTER, no 
UPDATE_BEFORE
+        List<KvRecord> kvData2 =
+                Arrays.asList(
+                        kvRecordFactory.ofRecord("k1".getBytes(), new Object[] 
{1, "v12"}),
+                        kvRecordFactory.ofRecord("k2".getBytes(), new Object[] 
{2, "v22"}));
+        KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2);
+        kvTablet.putAsLeader(kvRecordBatch2, null);
+
+        // Verify updates only produce +U, not -U
+        actualLogRecords = readLogRecords(endOffset);
+        expectedLogs =
+                logRecords(
+                        endOffset,
+                        Arrays.asList(ChangeType.UPDATE_AFTER, 
ChangeType.UPDATE_AFTER),
+                        Arrays.asList(new Object[] {1, "v12"}, new Object[] 
{2, "v22"}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+        endOffset = logTablet.localLogEndOffset();
+
+        // Delete one record - should still produce DELETE
+        List<KvRecord> kvData3 =
+                
Collections.singletonList(kvRecordFactory.ofRecord("k1".getBytes(), null));
+        KvRecordBatch kvRecordBatch3 = kvRecordBatchFactory.ofRecords(kvData3);
+        kvTablet.putAsLeader(kvRecordBatch3, null);
+
+        // Verify delete produces -D
+        actualLogRecords = readLogRecords(endOffset);
+        expectedLogs =
+                logRecords(
+                        endOffset,
+                        Collections.singletonList(ChangeType.DELETE),
+                        Collections.singletonList(new Object[] {1, "v12"}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+
+        // Verify KV store has correct final state
+        
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))).isNotNull();
+        assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k2".getBytes())))
+                .isEqualTo(valueOf(compactedRow(rowType, new Object[] {2, 
"v22"})));
+    }
+
+    @Test
+    void testWalModeChangelogImageNoUpdateBeforeWithPartialUpdate() throws 
Exception {
+        // WAL mode with partial update - INSERT produces INSERT, UPDATE 
produces UPDATE_AFTER
+        // only (no optimization applied)
+        Map<String, String> config = new HashMap<>();
+        config.put("table.changelog.image", "WAL");
+        initLogTabletAndKvTablet(DATA2_SCHEMA, config);
+        RowType rowType = DATA2_SCHEMA.getRowType();
+        KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
+                KvRecordTestUtils.KvRecordFactory.of(rowType);
+
+        // Insert with partial columns (column a only)
+        KvRecordBatch kvRecordBatch1 =
+                kvRecordBatchFactory.ofRecords(
+                        data2kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, null, 
null}));
+        kvTablet.putAsLeader(kvRecordBatch1, new int[] {0});
+
+        long endOffset = logTablet.localLogEndOffset();
+        // Verify insert produces +I (partial update goes through normal path)
+        LogRecords actualLogRecords = readLogRecords();
+        MemoryLogRecords expectedLogs =
+                logRecords(
+                        rowType,
+                        0L,
+                        Collections.singletonList(ChangeType.INSERT),
+                        Collections.singletonList(new Object[] {1, null, 
null}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs), 
rowType);
+
+        // Update with partial columns (column b)
+        KvRecordBatch kvRecordBatch2 =
+                kvRecordBatchFactory.ofRecords(
+                        data2kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, "v1", 
null}));
+        kvTablet.putAsLeader(kvRecordBatch2, new int[] {0, 1});
+
+        endOffset = logTablet.localLogEndOffset();
+        // Verify update only produces +U (no -U since using WAL mode)
+        actualLogRecords = readLogRecords(endOffset - 1);
+        expectedLogs =
+                logRecords(
+                        rowType,
+                        endOffset - 1,
+                        Collections.singletonList(ChangeType.UPDATE_AFTER),
+                        Collections.singletonList(new Object[] {1, "v1", 
null}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs), 
rowType);
+
+        // Update with partial columns (column c) - column b value should be 
retained
+        KvRecordBatch kvRecordBatch3 =
+                kvRecordBatchFactory.ofRecords(
+                        data2kvRecordFactory.ofRecord(
+                                "k1".getBytes(), new Object[] {1, null, 
"hello"}));
+        kvTablet.putAsLeader(kvRecordBatch3, new int[] {0, 2});
+
+        // Verify update produces +U with column b retained as "v1" and column 
c set to "hello"
+        actualLogRecords = readLogRecords(endOffset);
+        expectedLogs =
+                logRecords(
+                        rowType,
+                        endOffset,
+                        Collections.singletonList(ChangeType.UPDATE_AFTER),
+                        Collections.singletonList(new Object[] {1, "v1", 
"hello"}));
+        checkEqual(actualLogRecords, Collections.singletonList(expectedLogs), 
rowType);
+    }
+
     private LogRecords readLogRecords() throws Exception {
         return readLogRecords(0L);
     }
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index a126b85f3..6c637240e 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -85,6 +85,7 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 | table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). 
It also supports two merge engines are `first_row` and `versioned`. The 
[first_row merge 
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep 
the first row of the same primary key. The [v [...]
 | table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
 | table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default) - allows normal delete operations; 
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects 
delete requests and throws explicit errors. This configuration provides 
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) 
that must not receive  [...]
+| table.changelog.image                   | Enum     | FULL                    
            | Defines the changelog image mode for primary key tables. This 
configuration is inspired by similar settings in database systems like MySQL's 
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are 
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER 
records for update operations, capturing complete information about updates and 
allowing tracking of previous val [...]
 
 
 ## Read Options


Reply via email to