This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e5d10ac61 [kv] Support "table.changelog.image" configuration and WAL
image mode for Primary Key Tables (#2105)
e5d10ac61 is described below
commit e5d10ac61ef4b211bbce1901ece61e172942e7c7
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 20 00:23:26 2025 +0800
[kv] Support "table.changelog.image" configuration and WAL image mode for
Primary Key Tables (#2105)
Co-authored-by: Jark Wu <[email protected]>
---
.../org/apache/fluss/config/ConfigOptions.java | 16 ++
.../java/org/apache/fluss/config/TableConfig.java | 9 +
.../org/apache/fluss/metadata/ChangelogImage.java | 59 ++++
.../fluss/flink/source/FlinkTableSource.java | 25 +-
.../fluss/flink/sink/FlinkTableSinkITCase.java | 70 +++++
.../java/org/apache/fluss/server/kv/KvManager.java | 15 +-
.../java/org/apache/fluss/server/kv/KvTablet.java | 298 ++++++++++++++-------
.../org/apache/fluss/server/kv/KvTabletTest.java | 140 +++++++++-
website/docs/engine-flink/options.md | 1 +
9 files changed, 518 insertions(+), 115 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index b33ffc983..7eb203999 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -20,6 +20,7 @@ package org.apache.fluss.config;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.compression.ArrowCompressionType;
+import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
@@ -1446,6 +1447,21 @@ public class ConfigOptions {
+ "The auto increment column can only be
used in primary-key table. The data type of the auto increment column must be
INT or BIGINT."
+ "Currently a table can have only one
auto-increment column.");
+ public static final ConfigOption<ChangelogImage> TABLE_CHANGELOG_IMAGE =
+ key("table.changelog.image")
+ .enumType(ChangelogImage.class)
+ .defaultValue(ChangelogImage.FULL)
+ .withDescription(
+ "Defines the changelog image mode for the primary
key table. "
+ + "This configuration is inspired by
similar settings in database systems like MySQL's binlog_row_image and
PostgreSQL's replica identity. "
+ + "The supported modes are `FULL`
(default) and `WAL`. "
+ + "The `FULL` mode produces both
UPDATE_BEFORE and UPDATE_AFTER records for update operations, capturing
complete information about updates and allowing tracking of previous values. "
+ + "The `WAL` mode does not produce
UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if allowed)
records are emitted. "
+ + "When WAL mode is enabled with default
merge engine (no merge engine configured) and full row updates (not partial
update), an optimization is applied to skip looking up old values, "
+ + "and in this case INSERT operations are
converted to UPDATE_AFTER events. "
+ + "This mode reduces storage and
transmission costs but loses the ability to track previous values. "
+ + "This option only affects primary key
tables.");
+
// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index a1422f460..fc7966ab0 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -19,6 +19,7 @@ package org.apache.fluss.config;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.compression.ArrowCompressionInfo;
+import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
@@ -117,6 +118,14 @@ public class TableConfig {
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
}
+ /**
+ * Gets the changelog image mode of the table. The changelog image mode
defines what information
+ * is included in the changelog for update operations.
+ */
+ public ChangelogImage getChangelogImage() {
+ return config.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+ }
+
/** Gets the Arrow compression type and compression level of the table. */
public ArrowCompressionInfo getArrowCompressionInfo() {
return ArrowCompressionInfo.fromConf(config);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
new file mode 100644
index 000000000..02f7a5750
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/**
+ * The changelog image mode for the primary key table.
+ *
+ * <p>This enum defines what information is included in the changelog for
update operations. It is
+ * inspired by similar configurations in database systems like MySQL's
binlog_row_image and
+ * PostgreSQL's replica identity.
+ *
+ * @since 0.9
+ */
+public enum ChangelogImage {
+
+ /**
+ * Full changelog with both UPDATE_BEFORE and UPDATE_AFTER records. This
is the default behavior
+ * that captures complete information about updates, allowing tracking of
previous values.
+ */
+ FULL,
+
+ /**
+ * WAL mode does not produce UPDATE_BEFORE records. Only INSERT,
UPDATE_AFTER (and DELETE if
+ * allowed) records are emitted. When WAL mode is enabled with default
merge engine (no merge
+ * engine configured) and full row updates (not partial update), an
optimization is applied to
+ * skip looking up old values, and in this case INSERT operations are
converted to UPDATE_AFTER
+ * events, similar to database WAL (Write-Ahead Log) behavior. This mode
reduces storage and
+ * transmission costs but loses the ability to track previous values.
+ */
+ WAL;
+
+ /** Creates a {@link ChangelogImage} from the given string. */
+ public static ChangelogImage fromString(String image) {
+ switch (image.toUpperCase()) {
+ case "FULL":
+ return FULL;
+ case "WAL":
+ return WAL;
+
+ default:
+ throw new IllegalArgumentException("Unsupported changelog
image: " + image);
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index ef4b63812..2394921f1 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -31,6 +31,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TablePath;
@@ -206,10 +207,32 @@ public class FlinkTableSource
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
- // Check delete behavior configuration
Configuration tableConf =
Configuration.fromMap(tableOptions);
DeleteBehavior deleteBehavior =
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+ ChangelogImage changelogImage =
+ tableConf.get(ConfigOptions.TABLE_CHANGELOG_IMAGE);
+ if (changelogImage == ChangelogImage.WAL) {
+ // When using WAL mode, produce INSERT and
UPDATE_AFTER (and DELETE if
+ // allowed), without UPDATE_BEFORE. Note: with default
merge engine and full
+ // row updates, an optimization converts INSERT to
UPDATE_AFTER.
+ if (deleteBehavior == DeleteBehavior.ALLOW) {
+ // DELETE is still produced when delete behavior
is allowed
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ } else {
+ // No DELETE when delete operations are ignored or
disabled
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+ }
+
+ // Using FULL mode, produce full changelog
if (deleteBehavior == DeleteBehavior.ALLOW) {
return ChangelogMode.all();
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c8aeaaae8..c373dd998 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1384,4 +1384,74 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
}
+
+ @Test
+ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
+ String tableName = "wal_mode_pk_table";
+ // Create a table with WAL mode and default merge engine
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " id int not null,"
+ + " category string,"
+ + " amount bigint,"
+ + " primary key (id) not enforced"
+ + ") with ('table.changelog.image' = 'wal')",
+ tableName));
+
+ // Insert initial data
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO %s VALUES "
+ + "(1, 'A', 100), "
+ + "(2, 'B', 200), "
+ + "(3, 'A', 150), "
+ + "(4, 'B', 250)",
+ tableName))
+ .await();
+
+ // Use batch mode to update and delete records
+ tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE
id = 1").await();
+ tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE
id = 3").await();
+ tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id =
4").await();
+
+ // Do aggregation on the table and verify ChangelogNormalize node is
generated
+ String aggQuery =
+ String.format(
+ "SELECT category, SUM(amount) as total_amount FROM %s
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
+ tableName);
+
+ // Explain the aggregation query to check for ChangelogNormalize
+ String aggPlan = tEnv.explainSql(aggQuery);
+ // ChangelogNormalize should be present to normalize the changelog for
aggregation
+ // In Flink, when the source produces changelog with primary key
semantics (I, UA, D),
+ // a ChangelogNormalize operator is inserted before aggregation
+ assertThat(aggPlan).contains("ChangelogNormalize");
+
+ // Execute the aggregation and verify the result
+ CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
+
+ // Expected aggregation results:
+ // Category A: 120 (id=1) + 180 (id=3) = 300
+ // Category B: 200 (id=2) = 200 (id=4 was deleted)
+ List<String> expectedAggResults =
+ Arrays.asList(
+ "+I[A, 100]",
+ "-U[A, 100]",
+ "+U[A, 250]",
+ "-U[A, 250]",
+ "+U[A, 150]",
+ "-U[A, 150]",
+ "+U[A, 270]",
+ "-U[A, 270]",
+ "+U[A, 120]",
+ "-U[A, 120]",
+ "+U[A, 300]",
+ "+I[B, 250]",
+ "-D[B, 250]",
+ "+I[B, 200]");
+
+ // Collect results with timeout
+ assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index d1e24f6bc..b5318ef60 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -185,7 +185,8 @@ public final class KvManager extends TabletManagerBase {
kvFormat,
merger,
arrowCompressionInfo,
- schemaGetter);
+ schemaGetter,
+ tableConfig.getChangelogImage());
currentKvs.put(tableBucket, tablet);
LOG.info(
@@ -277,9 +278,8 @@ public final class KvManager extends TabletManagerBase {
TablePath tablePath = physicalTablePath.getTablePath();
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
- RowMerger rowMerger =
- RowMerger.create(
- tableInfo.getTableConfig(),
tableInfo.getTableConfig().getKvFormat());
+ TableConfig tableConfig = tableInfo.getTableConfig();
+ RowMerger rowMerger = RowMerger.create(tableConfig,
tableConfig.getKvFormat());
KvTablet kvTablet =
KvTablet.create(
physicalTablePath,
@@ -290,10 +290,11 @@ public final class KvManager extends TabletManagerBase {
serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
- tableInfo.getTableConfig().getKvFormat(),
+ tableConfig.getKvFormat(),
rowMerger,
- tableInfo.getTableConfig().getArrowCompressionInfo(),
- schemaGetter);
+ tableConfig.getArrowCompressionInfo(),
+ schemaGetter,
+ tableConfig.getChangelogImage());
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index e6542d8f4..1f1fb5f0f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -25,6 +25,7 @@ import org.apache.fluss.exception.DeletionDisabledException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
@@ -49,6 +50,7 @@ import
org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
+import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
@@ -114,6 +116,9 @@ public final class KvTablet {
private final SchemaGetter schemaGetter;
+ // the changelog image mode for this tablet
+ private final ChangelogImage changelogImage;
+
/**
* The kv data in pre-write buffer whose log offset is less than the
flushedLogOffset has been
* flushed into kv.
@@ -137,7 +142,8 @@ public final class KvTablet {
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
- SchemaGetter schemaGetter) {
+ SchemaGetter schemaGetter,
+ ChangelogImage changelogImage) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
@@ -152,6 +158,7 @@ public final class KvTablet {
this.rowMerger = rowMerger;
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
+ this.changelogImage = changelogImage;
}
public static KvTablet create(
@@ -164,7 +171,8 @@ public final class KvTablet {
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
- SchemaGetter schemaGetter)
+ SchemaGetter schemaGetter,
+ ChangelogImage changelogImage)
throws IOException {
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
FlussPaths.parseTabletDir(kvTabletDir);
@@ -180,7 +188,8 @@ public final class KvTablet {
kvFormat,
rowMerger,
arrowCompressionInfo,
- schemaGetter);
+ schemaGetter,
+ changelogImage);
}
public static KvTablet create(
@@ -195,7 +204,8 @@ public final class KvTablet {
KvFormat kvFormat,
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
- SchemaGetter schemaGetter)
+ SchemaGetter schemaGetter,
+ ChangelogImage changelogImage)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
return new KvTablet(
@@ -212,7 +222,8 @@ public final class KvTablet {
kvFormat,
rowMerger,
arrowCompressionInfo,
- schemaGetter);
+ schemaGetter,
+ changelogImage);
}
private static RocksDBKv buildRocksDBKv(Configuration configuration, File
kvDir)
@@ -282,20 +293,13 @@ public final class KvTablet {
SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo();
Schema latestSchema = schemaInfo.getSchema();
short latestSchemaId = (short) schemaInfo.getSchemaId();
- short schemaIdOfNewData = kvRecords.schemaId();
- if (schemaIdOfNewData > latestSchemaId ||
schemaIdOfNewData < 0) {
- // TODO: we may need to support retriable exception
here
- throw new SchemaNotExistException(
- "Invalid schema id: "
- + schemaIdOfNewData
- + ", latest schema id: "
- + latestSchemaId);
- }
+ validateSchemaId(kvRecords.schemaId(), latestSchemaId);
// we only support ADD COLUMN, so targetColumns is fine to
be used directly
RowMerger currentMerger =
rowMerger.configureTargetColumns(
targetColumns, latestSchemaId,
latestSchema);
+
RowType latestRowType = latestSchema.getRowType();
WalBuilder walBuilder = createWalBuilder(latestSchemaId,
latestRowType);
walBuilder.setWriterState(kvRecords.writerId(),
kvRecords.batchSequence());
@@ -305,96 +309,15 @@ public final class KvTablet {
PaddingRow latestSchemaRow = new
PaddingRow(latestRowType.getFieldCount());
// get offset to track the offset corresponded to the kv
record
long logEndOffsetOfPrevBatch =
logTablet.localLogEndOffset();
+
try {
- long logOffset = logEndOffsetOfPrevBatch;
-
- // TODO: reuse the read context and decoder
- KvRecordBatch.ReadContext readContext =
-
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
- ValueDecoder valueDecoder = new
ValueDecoder(schemaGetter, kvFormat);
- for (KvRecord kvRecord :
kvRecords.records(readContext)) {
- byte[] keyBytes =
BytesUtils.toArray(kvRecord.getKey());
- KvPreWriteBuffer.Key key =
KvPreWriteBuffer.Key.of(keyBytes);
- BinaryRow row = kvRecord.getRow();
- BinaryValue currentValue =
- row == null ? null : new
BinaryValue(schemaIdOfNewData, row);
- if (currentValue == null) {
- DeleteBehavior deleteBehavior =
currentMerger.deleteBehavior();
- if (deleteBehavior == DeleteBehavior.IGNORE) {
- // skip delete rows if the merger doesn't
support yet
- continue;
- } else if (deleteBehavior ==
DeleteBehavior.DISABLE) {
- throw new DeletionDisabledException(
- "Delete operations are disabled
for this table. "
- + "The
table.delete.behavior is set to 'disable'.");
- }
- // it's for deletion
- byte[] oldValueBytes = getFromBufferOrKv(key);
-
- if (oldValueBytes == null) {
- // there might be large amount of such
deletion, so we don't log
- LOG.debug(
- "The specific key can't be found
in kv tablet although the kv record is for deletion, "
- + "ignore it directly as
it doesn't exist in the kv tablet yet.");
- } else {
- BinaryValue oldValue =
valueDecoder.decodeValue(oldValueBytes);
- BinaryValue newValue =
currentMerger.delete(oldValue);
- // if newRow is null, it means the row
should be deleted
- if (newValue == null) {
- walBuilder.append(
- ChangeType.DELETE,
-
latestSchemaRow.replaceRow(oldValue.row));
- kvPreWriteBuffer.delete(key,
logOffset++);
- } else {
- // otherwise, it's a partial update,
should produce -U,+U
- walBuilder.append(
- ChangeType.UPDATE_BEFORE,
-
latestSchemaRow.replaceRow(oldValue.row));
- walBuilder.append(
- ChangeType.UPDATE_AFTER,
-
latestSchemaRow.replaceRow(newValue.row));
- kvPreWriteBuffer.put(
- key, newValue.encodeValue(),
logOffset + 1);
- logOffset += 2;
- }
- }
- } else {
- // upsert operation
- byte[] oldValueBytes = getFromBufferOrKv(key);
- // it's update
- if (oldValueBytes != null) {
- BinaryValue oldValue =
valueDecoder.decodeValue(oldValueBytes);
- BinaryValue newValue =
- currentMerger.merge(oldValue,
currentValue);
- if (newValue == oldValue) {
- // newValue is the same to oldValue,
means nothing
- // happens (no update/delete), and
input should be ignored
- continue;
- }
-
- walBuilder.append(
- ChangeType.UPDATE_BEFORE,
-
latestSchemaRow.replaceRow(oldValue.row));
- walBuilder.append(
- ChangeType.UPDATE_AFTER,
-
latestSchemaRow.replaceRow(newValue.row));
- // logOffset is for -U, logOffset + 1 is
for +U, we need to use
- // the log offset for +U
- kvPreWriteBuffer.put(
- key, newValue.encodeValue(),
logOffset + 1);
- logOffset += 2;
- } else {
- // it's insert
- // TODO: we should add guarantees that all
non-specified columns
- // of the input row are set to null.
- walBuilder.append(
- ChangeType.INSERT,
-
latestSchemaRow.replaceRow(currentValue.row));
- kvPreWriteBuffer.put(
- key, currentValue.encodeValue(),
logOffset++);
- }
- }
- }
+ processKvRecords(
+ kvRecords,
+ kvRecords.schemaId(),
+ currentMerger,
+ walBuilder,
+ latestSchemaRow,
+ logEndOffsetOfPrevBatch);
// There will be a situation that these batches of
kvRecordBatch have not
// generated any CDC logs, for example, when client
attempts to delete
@@ -431,6 +354,175 @@ public final class KvTablet {
});
}
+ private void validateSchemaId(short schemaIdOfNewData, short
latestSchemaId) {
+ if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
+ // TODO: we may need to support retriable exception here
+ throw new SchemaNotExistException(
+ "Invalid schema id: "
+ + schemaIdOfNewData
+ + ", latest schema id: "
+ + latestSchemaId);
+ }
+ }
+
+ private void processKvRecords(
+ KvRecordBatch kvRecords,
+ short schemaIdOfNewData,
+ RowMerger currentMerger,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long startLogOffset)
+ throws Exception {
+ long logOffset = startLogOffset;
+
+ // TODO: reuse the read context and decoder
+ KvRecordBatch.ReadContext readContext =
+ KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
+ ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, kvFormat);
+
+ for (KvRecord kvRecord : kvRecords.records(readContext)) {
+ byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
+ KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
+ BinaryRow row = kvRecord.getRow();
+ BinaryValue currentValue = row == null ? null : new
BinaryValue(schemaIdOfNewData, row);
+
+ if (currentValue == null) {
+ logOffset =
+ processDeletion(
+ key,
+ currentMerger,
+ valueDecoder,
+ walBuilder,
+ latestSchemaRow,
+ logOffset);
+ } else {
+ logOffset =
+ processUpsert(
+ key,
+ currentValue,
+ currentMerger,
+ valueDecoder,
+ walBuilder,
+ latestSchemaRow,
+ logOffset);
+ }
+ }
+ }
+
+ private long processDeletion(
+ KvPreWriteBuffer.Key key,
+ RowMerger currentMerger,
+ ValueDecoder valueDecoder,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long logOffset)
+ throws Exception {
+ DeleteBehavior deleteBehavior = currentMerger.deleteBehavior();
+ if (deleteBehavior == DeleteBehavior.IGNORE) {
+ // skip delete rows if the merger doesn't support yet
+ return logOffset;
+ } else if (deleteBehavior == DeleteBehavior.DISABLE) {
+ throw new DeletionDisabledException(
+ "Delete operations are disabled for this table. "
+ + "The table.delete.behavior is set to
'disable'.");
+ }
+
+ byte[] oldValueBytes = getFromBufferOrKv(key);
+ if (oldValueBytes == null) {
+ LOG.debug(
+ "The specific key can't be found in kv tablet although the
kv record is for deletion, "
+ + "ignore it directly as it doesn't exist in the
kv tablet yet.");
+ return logOffset;
+ }
+
+ BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+ BinaryValue newValue = currentMerger.delete(oldValue);
+
+ // if newValue is null, it means the row should be deleted
+ if (newValue == null) {
+ return applyDelete(key, oldValue, walBuilder, latestSchemaRow,
logOffset);
+ } else {
+ return applyUpdate(key, oldValue, newValue, walBuilder,
latestSchemaRow, logOffset);
+ }
+ }
+
+ private long processUpsert(
+ KvPreWriteBuffer.Key key,
+ BinaryValue currentValue,
+ RowMerger currentMerger,
+ ValueDecoder valueDecoder,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long logOffset)
+ throws Exception {
+ // Optimization: when using WAL mode and merger is DefaultRowMerger
(full update, not
+ // partial update), we can skip fetching old value for better
performance since it
+ // always returns new value. In this case, both INSERT and UPDATE will
produce
+ // UPDATE_AFTER.
+ if (changelogImage == ChangelogImage.WAL && currentMerger instanceof
DefaultRowMerger) {
+ return applyUpdate(key, null, currentValue, walBuilder,
latestSchemaRow, logOffset);
+ }
+
+ byte[] oldValueBytes = getFromBufferOrKv(key);
+ if (oldValueBytes == null) {
+ return applyInsert(key, currentValue, walBuilder, latestSchemaRow,
logOffset);
+ }
+
+ BinaryValue oldValue = valueDecoder.decodeValue(oldValueBytes);
+ BinaryValue newValue = currentMerger.merge(oldValue, currentValue);
+
+ if (newValue == oldValue) {
+ // no actual change, skip this record
+ return logOffset;
+ }
+
+ return applyUpdate(key, oldValue, newValue, walBuilder,
latestSchemaRow, logOffset);
+ }
+
+ private long applyDelete(
+ KvPreWriteBuffer.Key key,
+ BinaryValue oldValue,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long logOffset)
+ throws Exception {
+ walBuilder.append(ChangeType.DELETE,
latestSchemaRow.replaceRow(oldValue.row));
+ kvPreWriteBuffer.delete(key, logOffset);
+ return logOffset + 1;
+ }
+
+ private long applyInsert(
+ KvPreWriteBuffer.Key key,
+ BinaryValue currentValue,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long logOffset)
+ throws Exception {
+ walBuilder.append(ChangeType.INSERT,
latestSchemaRow.replaceRow(currentValue.row));
+ kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset);
+ return logOffset + 1;
+ }
+
+ private long applyUpdate(
+ KvPreWriteBuffer.Key key,
+ BinaryValue oldValue,
+ BinaryValue newValue,
+ WalBuilder walBuilder,
+ PaddingRow latestSchemaRow,
+ long logOffset)
+ throws Exception {
+ if (changelogImage == ChangelogImage.WAL) {
+ walBuilder.append(ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
+ kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
+ return logOffset + 1;
+ } else {
+ walBuilder.append(ChangeType.UPDATE_BEFORE,
latestSchemaRow.replaceRow(oldValue.row));
+ walBuilder.append(ChangeType.UPDATE_AFTER,
latestSchemaRow.replaceRow(newValue.row));
+ kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset + 1);
+ return logOffset + 2;
+ }
+ }
+
private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws
Exception {
switch (logFormat) {
case INDEXED:
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index f2013a91a..94d41bffe 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -176,9 +176,8 @@ class KvTabletTest {
SchemaGetter schemaGetter,
Map<String, String> tableConfig)
throws Exception {
- RowMerger rowMerger =
- RowMerger.create(
- new TableConfig(Configuration.fromMap(tableConfig)),
KvFormat.COMPACTED);
+ TableConfig tableConf = new
TableConfig(Configuration.fromMap(tableConfig));
+ RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED);
return KvTablet.create(
tablePath,
tableBucket,
@@ -191,7 +190,8 @@ class KvTabletTest {
KvFormat.COMPACTED,
rowMerger,
DEFAULT_COMPRESSION,
- schemaGetter);
+ schemaGetter,
+ tableConf.getChangelogImage());
}
@Test
@@ -1044,6 +1044,138 @@ class KvTabletTest {
assertThat(kvTablet.getKvPreWriteBuffer().getMaxLSN()).isEqualTo(9);
}
+ @Test
+ void testWalModeChangelogImageNoUpdateBefore() throws Exception {
+ // WAL mode - no UPDATE_BEFORE. With default merge engine and full row
update,
+ // optimization converts INSERT to UPDATE_AFTER
+ Map<String, String> config = new HashMap<>();
+ config.put("table.changelog.image", "WAL");
+ initLogTabletAndKvTablet(DATA1_SCHEMA_PK, config);
+ RowType rowType = DATA1_SCHEMA_PK.getRowType();
+
+ // Insert two records
+ List<KvRecord> kvData1 =
+ Arrays.asList(
+ kvRecordFactory.ofRecord("k1".getBytes(), new Object[]
{1, "v11"}),
+ kvRecordFactory.ofRecord("k2".getBytes(), new Object[]
{2, "v21"}));
+ KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1);
+ kvTablet.putAsLeader(kvRecordBatch1, null);
+ long endOffset = logTablet.localLogEndOffset();
+
+ // Verify inserts produce +U (optimization in WAL mode with default
merge engine and full
+ // row update)
+ LogRecords actualLogRecords = readLogRecords();
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ 0L,
+ Arrays.asList(ChangeType.UPDATE_AFTER,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(new Object[] {1, "v11"}, new Object[]
{2, "v21"}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+
+ // Update the records - should only produce UPDATE_AFTER, no
UPDATE_BEFORE
+ List<KvRecord> kvData2 =
+ Arrays.asList(
+ kvRecordFactory.ofRecord("k1".getBytes(), new Object[]
{1, "v12"}),
+ kvRecordFactory.ofRecord("k2".getBytes(), new Object[]
{2, "v22"}));
+ KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2);
+ kvTablet.putAsLeader(kvRecordBatch2, null);
+
+ // Verify updates only produce +U, not -U
+ actualLogRecords = readLogRecords(endOffset);
+ expectedLogs =
+ logRecords(
+ endOffset,
+ Arrays.asList(ChangeType.UPDATE_AFTER,
ChangeType.UPDATE_AFTER),
+ Arrays.asList(new Object[] {1, "v12"}, new Object[]
{2, "v22"}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+ endOffset = logTablet.localLogEndOffset();
+
+ // Delete one record - should still produce DELETE
+ List<KvRecord> kvData3 =
+
Collections.singletonList(kvRecordFactory.ofRecord("k1".getBytes(), null));
+ KvRecordBatch kvRecordBatch3 = kvRecordBatchFactory.ofRecords(kvData3);
+ kvTablet.putAsLeader(kvRecordBatch3, null);
+
+ // Verify delete produces -D
+ actualLogRecords = readLogRecords(endOffset);
+ expectedLogs =
+ logRecords(
+ endOffset,
+ Collections.singletonList(ChangeType.DELETE),
+ Collections.singletonList(new Object[] {1, "v12"}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs));
+
+ // Verify KV store has correct final state
+
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))).isNotNull();
+ assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k2".getBytes())))
+ .isEqualTo(valueOf(compactedRow(rowType, new Object[] {2,
"v22"})));
+ }
+
+ @Test
+ void testWalModeChangelogImageNoUpdateBeforeWithPartialUpdate() throws
Exception {
+ // WAL mode with partial update - INSERT produces INSERT, UPDATE
produces UPDATE_AFTER
+ // only (no optimization applied)
+ Map<String, String> config = new HashMap<>();
+ config.put("table.changelog.image", "WAL");
+ initLogTabletAndKvTablet(DATA2_SCHEMA, config);
+ RowType rowType = DATA2_SCHEMA.getRowType();
+ KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
+ KvRecordTestUtils.KvRecordFactory.of(rowType);
+
+ // Insert with partial columns (column a only)
+ KvRecordBatch kvRecordBatch1 =
+ kvRecordBatchFactory.ofRecords(
+ data2kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, null,
null}));
+ kvTablet.putAsLeader(kvRecordBatch1, new int[] {0});
+
+ long endOffset = logTablet.localLogEndOffset();
+ // Verify insert produces +I (partial update goes through normal path)
+ LogRecords actualLogRecords = readLogRecords();
+ MemoryLogRecords expectedLogs =
+ logRecords(
+ rowType,
+ 0L,
+ Collections.singletonList(ChangeType.INSERT),
+ Collections.singletonList(new Object[] {1, null,
null}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs),
rowType);
+
+ // Update with partial columns (column b)
+ KvRecordBatch kvRecordBatch2 =
+ kvRecordBatchFactory.ofRecords(
+ data2kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, "v1",
null}));
+ kvTablet.putAsLeader(kvRecordBatch2, new int[] {0, 1});
+
+ endOffset = logTablet.localLogEndOffset();
+ // Verify update only produces +U (no -U since using WAL mode)
+ actualLogRecords = readLogRecords(endOffset - 1);
+ expectedLogs =
+ logRecords(
+ rowType,
+ endOffset - 1,
+ Collections.singletonList(ChangeType.UPDATE_AFTER),
+ Collections.singletonList(new Object[] {1, "v1",
null}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs),
rowType);
+
+ // Update with partial columns (column c) - column b value should be
retained
+ KvRecordBatch kvRecordBatch3 =
+ kvRecordBatchFactory.ofRecords(
+ data2kvRecordFactory.ofRecord(
+ "k1".getBytes(), new Object[] {1, null,
"hello"}));
+ kvTablet.putAsLeader(kvRecordBatch3, new int[] {0, 2});
+
+ // Verify update produces +U with column b retained as "v1" and column
c set to "hello"
+ actualLogRecords = readLogRecords(endOffset);
+ expectedLogs =
+ logRecords(
+ rowType,
+ endOffset,
+ Collections.singletonList(ChangeType.UPDATE_AFTER),
+ Collections.singletonList(new Object[] {1, "v1",
"hello"}));
+ checkEqual(actualLogRecords, Collections.singletonList(expectedLogs),
rowType);
+ }
+
private LogRecords readLogRecords() throws Exception {
return readLogRecords(0L);
}
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index a126b85f3..6c637240e 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -85,6 +85,7 @@ See more details about [ALTER TABLE ...
SET](engine-flink/ddl.md#set-properties)
| table.merge-engine | Enum | (None)
| Defines the merge engine for the primary key table. By default,
primary key table uses the [default merge
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md).
It also supports two merge engines are `first_row` and `versioned`. The
[first_row merge
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep
the first row of the same primary key. The [v [...]
| table.merge-engine.versioned.ver-column | String | (None)
| The column name of the version column for the `versioned` merge
engine. If the merge engine is set to `versioned`, the version column must be
set.
[...]
| table.delete.behavior | Enum | ALLOW
| Controls the behavior of delete operations on primary key tables.
Three modes are supported: `ALLOW` (default) - allows normal delete operations;
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects
delete requests and throws explicit errors. This configuration provides
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join)
that must not receive [...]
+| table.changelog.image | Enum | FULL
| Defines the changelog image mode for primary key tables. This
configuration is inspired by similar settings in database systems like MySQL's
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER
records for update operations, capturing complete information about updates and
allowing tracking of previous val [...]
## Read Options