This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3ec81a875 [kv] Support 'table.delete.behavior' config to disable or
ignore deletion on the table (#1783)
3ec81a875 is described below
commit 3ec81a875d90dd87f927b65ed15d3102086f4550
Author: CaoZhen <[email protected]>
AuthorDate: Tue Oct 21 22:55:16 2025 +0800
[kv] Support 'table.delete.behavior' config to disable or ignore deletion
on the table (#1783)
---
.../fluss/client/admin/FlussAdminITCase.java | 88 ++++++++++++++
.../org/apache/fluss/config/ConfigOptions.java | 13 +++
.../java/org/apache/fluss/config/TableConfig.java | 6 +
.../fluss/exception/DeletionDisabledException.java | 35 ++++++
.../org/apache/fluss/metadata/DeleteBehavior.java | 62 ++++++++++
.../fluss/flink/catalog/FlinkTableFactory.java | 2 +
.../apache/fluss/flink/sink/FlinkTableSink.java | 27 +++--
.../fluss/flink/source/FlinkTableSource.java | 19 ++-
.../fluss/flink/sink/FlinkTableSinkITCase.java | 128 +++++++++++++++++++++
.../java/org/apache/fluss/rpc/protocol/Errors.java | 5 +-
.../server/coordinator/CoordinatorService.java | 16 +++
.../java/org/apache/fluss/server/kv/KvTablet.java | 9 +-
.../server/kv/rowmerger/DefaultRowMerger.java | 22 ++--
.../server/kv/rowmerger/FirstRowRowMerger.java | 16 ++-
.../fluss/server/kv/rowmerger/RowMerger.java | 19 +--
.../server/kv/rowmerger/VersionedRowMerger.java | 16 ++-
.../server/utils/TableDescriptorValidation.java | 26 +++++
.../server/kv/rowmerger/DefaultRowMergerTest.java | 83 +++++++++++++
.../kv/rowmerger/VersionedRowMergerTest.java | 7 +-
website/docs/engine-flink/options.md | 1 +
20 files changed, 568 insertions(+), 32 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 61b8f5f28..860d63f36 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -50,6 +50,7 @@ import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PartitionInfo;
@@ -86,6 +87,7 @@ import java.util.stream.Stream;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -331,6 +333,92 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
"Table name __internal_table is invalid: '__' is not
allowed as prefix, since it is reserved for internal databases/internal
tables/internal partitions in Fluss server");
}
+ @Test
+ void testCreateTableWithDeleteBehavior() {
+ // Test 1: FIRST_ROW merge engine - should set delete behavior to
IGNORE
+ TablePath tablePath1 = TablePath.of("fluss",
"test_ignore_delete_for_first_row");
+ Map<String, String> properties1 = new HashMap<>();
+ properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+
+ TableDescriptor tableDescriptor1 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("first row merge engine table")
+ .properties(properties1)
+ .build();
+ admin.createTable(tablePath1, tableDescriptor1, false).join();
+
+ // Get the table and verify delete behavior is changed to IGNORE
+ TableInfo tableInfo1 = admin.getTableInfo(tablePath1).join();
+
assertThat(tableInfo1.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
+
+ // Test 2: VERSIONED merge engine - should set delete behavior to
IGNORE
+ TablePath tablePath2 = TablePath.of("fluss",
"test_ignore_delete_for_versioned");
+ Map<String, String> properties2 = new HashMap<>();
+ properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+ properties2.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(),
"age");
+ TableDescriptor tableDescriptor2 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("versioned merge engine table")
+ .properties(properties2)
+ .build();
+ admin.createTable(tablePath2, tableDescriptor2, false).join();
+ // Get the table and verify delete behavior is changed to IGNORE
+ TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
+
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
+
+ // Test 3: FIRST_ROW merge engine with delete behavior explicitly set
to ALLOW
+ TablePath tablePath3 = TablePath.of("fluss",
"test_allow_delete_for_first_row");
+ Map<String, String> properties3 = new HashMap<>();
+ properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
+ properties3.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
+ TableDescriptor tableDescriptor3 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("first row merge engine table")
+ .properties(properties3)
+ .build();
+ assertThatThrownBy(() -> admin.createTable(tablePath3,
tableDescriptor3, false).join())
+ .hasRootCauseInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining(
+ "Table with 'FIRST_ROW' merge engine does not support
delete operations. "
+ + "The 'table.delete.behavior' config must be
set to 'ignore' or 'disable', but got 'allow'.");
+
+ // Test 4: VERSIONED merge engine with delete behavior explicitly set
to ALLOW
+ TablePath tablePath4 = TablePath.of("fluss",
"test_allow_delete_for_versioned");
+ Map<String, String> properties4 = new HashMap<>();
+ properties4.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
+ properties4.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(),
"age");
+ properties4.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
+ TableDescriptor tableDescriptor4 =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("versioned merge engine table")
+ .properties(properties4)
+ .build();
+ assertThatThrownBy(() -> admin.createTable(tablePath4,
tableDescriptor4, false).join())
+ .hasRootCauseInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining(
+ "Table with 'VERSIONED' merge engine does not support
delete operations. "
+ + "The 'table.delete.behavior' config must be
set to 'ignore' or 'disable', but got 'allow'.");
+
+ // Test 5: Log table - not allow to set delete behavior
+ TablePath tablePath5 = TablePath.of("fluss",
"test_set_delete_behavior_for_log_table");
+ Map<String, String> properties5 = new HashMap<>();
+ properties5.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "IGNORE");
+ TableDescriptor tableDescriptor5 =
+ TableDescriptor.builder()
+ .schema(DATA1_SCHEMA)
+ .comment("log table")
+ .properties(properties5)
+ .build();
+ assertThatThrownBy(() -> admin.createTable(tablePath5,
tableDescriptor5, false).join())
+ .hasRootCauseInstanceOf(InvalidConfigException.class)
+ .hasMessageContaining(
+ "The 'table.delete.behavior' configuration is only
supported for primary key tables.");
+ }
+
@Test
void testCreateTableWithInvalidProperty() {
TablePath tablePath =
TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 6c75ff3c9..6b045bb38 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.compression.ArrowCompressionType;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.MergeEngineType;
@@ -1381,6 +1382,18 @@ public class ConfigOptions {
"The column name of the version column for the
`versioned` merge engine. "
+ "If the merge engine is set to
`versioned`, the version column must be set.");
+ public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
+ key("table.delete.behavior")
+ .enumType(DeleteBehavior.class)
+ .defaultValue(DeleteBehavior.ALLOW)
+ .withDescription(
+ "Defines the delete behavior for the primary key
table. "
+ + "The supported delete behaviors are
`allow`, `ignore`, and `disable`. "
+ + "The `allow` behavior allows normal
delete operations (default). "
+ + "The `ignore` behavior silently skips
delete requests without error. "
+ + "The `disable` behavior rejects delete
requests with a clear error message. "
+ + "For tables with FIRST_ROW or VERSIONED
merge engines, this option defaults to `ignore`.");
+
// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index a1f8ac9cc..a1422f460 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -20,6 +20,7 @@ package org.apache.fluss.config;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.compression.ArrowCompressionInfo;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.MergeEngineType;
@@ -111,6 +112,11 @@ public class TableConfig {
return
config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
}
+ /** Gets the delete behavior of the table. */
+ public Optional<DeleteBehavior> getDeleteBehavior() {
+ return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+ }
+
/** Gets the Arrow compression type and compression level of the table. */
public ArrowCompressionInfo getArrowCompressionInfo() {
return ArrowCompressionInfo.fromConf(config);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
b/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
new file mode 100644
index 000000000..2b69a42ea
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/DeletionDisabledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/**
+ * Exception thrown when deletion operations are disabled on a table. This
exception is used when a
+ * table has been configured with delete behavior set to 'disable', indicating
that deletion
+ * operations are not allowed and should be rejected.
+ *
+ * @see org.apache.fluss.config.ConfigOptions#TABLE_DELETE_BEHAVIOR
+ */
+public class DeletionDisabledException extends ApiException {
+ public DeletionDisabledException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DeletionDisabledException(String message) {
+ this(message, null);
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java
new file mode 100644
index 000000000..23ebf641d
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DeleteBehavior.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/**
+ * The delete behavior for the primary key table.
+ *
+ * <p>This enum defines how delete operations should be handled for primary
key tables. It provides
+ * different strategies to control whether deletions are allowed, ignored, or
explicitly disabled.
+ *
+ * @since 0.8
+ */
+public enum DeleteBehavior {
+
+ /**
+ * Allow normal delete operations. This is the default behavior for
primary key tables without
+ * merge engines.
+ */
+ ALLOW,
+
+ /**
+ * Silently ignore delete requests without error. Delete operations will
be dropped at the
+ * server side, and no deletion will be performed. This is the default
behavior for tables with
+ * FIRST_ROW or VERSIONED merge engines.
+ */
+ IGNORE,
+
+ /**
+ * Reject delete requests with a clear error message. Any attempt to
perform delete operations
+ * will result in an exception being thrown.
+ */
+ DISABLE;
+
+ /** Creates a {@link DeleteBehavior} from the given string. */
+ public static DeleteBehavior fromString(String behavior) {
+ switch (behavior.toUpperCase()) {
+ case "ALLOW":
+ return ALLOW;
+ case "IGNORE":
+ return IGNORE;
+ case "DISABLE":
+ return DISABLE;
+ default:
+ throw new IllegalArgumentException("Unsupported delete
behavior: " + behavior);
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index d90b14164..92579e3ac 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -59,6 +59,7 @@ import java.util.Optional;
import java.util.Set;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
@@ -187,6 +188,7 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
tableOptions.get(toFlinkOption(TABLE_DATALAKE_FORMAT)),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
+ tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
getBucketKeys(tableOptions),
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
index 1303f8faf..4edd586e4 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
@@ -24,6 +24,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.GenericRow;
@@ -74,7 +75,8 @@ public class FlinkTableSink
private final List<String> partitionKeys;
private final boolean streaming;
@Nullable private final MergeEngineType mergeEngineType;
- private final boolean ignoreDelete;
+ private final boolean sinkIgnoreDelete;
+ private final DeleteBehavior tableDeleteBehavior;
private final int numBucket;
private final List<String> bucketKeys;
private final boolean shuffleByBucketId;
@@ -92,7 +94,8 @@ public class FlinkTableSink
boolean streaming,
@Nullable MergeEngineType mergeEngineType,
@Nullable DataLakeFormat lakeFormat,
- boolean ignoreDelete,
+ boolean sinkIgnoreDelete,
+ DeleteBehavior tableDeleteBehavior,
int numBucket,
List<String> bucketKeys,
boolean shuffleByBucketId) {
@@ -103,7 +106,8 @@ public class FlinkTableSink
this.partitionKeys = partitionKeys;
this.streaming = streaming;
this.mergeEngineType = mergeEngineType;
- this.ignoreDelete = ignoreDelete;
+ this.sinkIgnoreDelete = sinkIgnoreDelete;
+ this.tableDeleteBehavior = tableDeleteBehavior;
this.numBucket = numBucket;
this.bucketKeys = bucketKeys;
this.shuffleByBucketId = shuffleByBucketId;
@@ -115,7 +119,7 @@ public class FlinkTableSink
if (!streaming) {
return ChangelogMode.insertOnly();
} else {
- if (primaryKeyIndexes.length > 0 || ignoreDelete) {
+ if (primaryKeyIndexes.length > 0 || sinkIgnoreDelete) {
// primary-key table or ignore_delete mode can accept
RowKind.DELETE
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -200,7 +204,7 @@ public class FlinkTableSink
partitionKeys,
lakeFormat,
shuffleByBucketId,
- new RowDataSerializationSchema(false,
ignoreDelete))
+ new RowDataSerializationSchema(false,
sinkIgnoreDelete))
: new FlinkSink.AppendSinkWriterBuilder<>(
tablePath,
flussConfig,
@@ -210,7 +214,7 @@ public class FlinkTableSink
partitionKeys,
lakeFormat,
shuffleByBucketId,
- new RowDataSerializationSchema(true,
ignoreDelete));
+ new RowDataSerializationSchema(true,
sinkIgnoreDelete));
return new FlinkSink<>(flinkSinkWriterBuilder);
}
@@ -235,7 +239,8 @@ public class FlinkTableSink
streaming,
mergeEngineType,
lakeFormat,
- ignoreDelete,
+ sinkIgnoreDelete,
+ tableDeleteBehavior,
numBucket,
bucketKeys,
shuffleByBucketId);
@@ -360,6 +365,14 @@ public class FlinkTableSink
"Table %s uses the '%s' merge engine which does
not support DELETE or UPDATE statements.",
tablePath, mergeEngineType));
}
+
+ // Check table-level delete behavior configuration
+ if (tableDeleteBehavior == DeleteBehavior.DISABLE) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Table %s has delete behavior set to 'disable'
which does not support DELETE statements.",
+ tablePath));
+ }
}
private Map<Integer, LogicalType> getPrimaryKeyTypes() {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index c278e1572..e7c7357e3 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -17,6 +17,7 @@
package org.apache.fluss.flink.source;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
@@ -30,6 +31,7 @@ import org.apache.fluss.flink.utils.PushdownUtils;
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.GreaterOrEqual;
@@ -74,6 +76,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,7 +209,21 @@ public class FlinkTableSource
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
- return ChangelogMode.all();
+ // Check delete behavior configuration
+ Configuration tableConf =
Configuration.fromMap(tableOptions);
+ DeleteBehavior deleteBehavior =
+ tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+ if (deleteBehavior == DeleteBehavior.ALLOW) {
+ return ChangelogMode.all();
+ } else {
+ // If delete operations are ignored or disabled, only
insert and update are
+ // relevant
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
}
} else {
// append only
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index b01a7e4b2..0ea8ed94f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -1095,4 +1096,131 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
this.expectedRows = expectedRows;
}
}
+
+ @Test
+ void testDeleteBehaviorDisabledForDeleteStmt() {
+ String tableName = "delete_behavior_disable_table";
+ tBatchEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a int not null,"
+ + " b bigint null, "
+ + " c string null, "
+ + " primary key (a) not enforced"
+ + ") with ('table.delete.behavior' =
'disable')",
+ tableName));
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ assertThatThrownBy(
+ () ->
+ tBatchEnv
+ .executeSql("DELETE FROM " + tableName
+ " WHERE a = 1")
+ .await())
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "Table %s has delete behavior set to 'disable'
which does not support DELETE statements.",
+ tablePath));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"ignore", "disable", "allow"})
+ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws
Exception {
+ String tableName = "delete_behavior_ignore_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a int not null primary key not enforced,"
+ + " b string"
+ + ") with ('table.delete.behavior' = '%s')",
+ tableName, deleteBehavior));
+
+ // 1. Verify the changelog mode of the table
+ String changelogModePlan =
+ tEnv.explainSql("SELECT * FROM " + tableName,
ExplainDetail.CHANGELOG_MODE);
+ if (deleteBehavior.equals("allow")) {
+ assertThat(changelogModePlan)
+ .contains(
+ "TableSourceScan(table=[[testcatalog, defaultdb,
delete_behavior_ignore_table]], fields=[a, b], "
+ + "changelogMode=[I,UB,UA,D])");
+ } else {
+ // For 'ignore' and 'disable', delete operations are not emitted
in the changelog
+ assertThat(changelogModePlan)
+ .contains(
+ "TableSourceScan(table=[[testcatalog, defaultdb,
delete_behavior_ignore_table]], fields=[a, b], "
+ + "changelogMode=[I,UB,UA])");
+ }
+
+ // 2. Write data including delete operations and verify the final
table state
+
+ // Insert some data
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO %s VALUES (1, 'test1'), (2,
'test2'), (3, 'test3')",
+ tableName))
+ .await();
+
+ // Create a changelog stream with deletes that should be ignored
+ org.apache.flink.table.api.Table changelogData =
+ tEnv.fromChangelogStream(
+ env.fromCollection(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 4, "test4"),
+ Row.ofKind(RowKind.DELETE, 1,
"test1"), // Should be ignored
+ Row.ofKind(RowKind.UPDATE_AFTER, 2,
"updated_test2"))));
+ tEnv.createTemporaryView("changelog_source", changelogData);
+
+ // Disable upsert materialization to avoid generate SinkMaterializer
operator,
+ // because we want to see the original delete messages in sink
+ tEnv.getConfig().set("table.exec.sink.upsert-materialize", "NONE");
+ String plan =
+ tEnv.explainSql(
+ String.format("INSERT INTO %s SELECT * FROM
changelog_source", tableName));
+ assertThat(plan).doesNotContain("upsertMaterialize=[true]");
+
+ // Insert changelog data
+ TableResult tableResult =
+ tEnv.executeSql(
+ String.format("INSERT INTO %s SELECT * FROM
changelog_source", tableName));
+
+ // 3. Verify the final table state based on delete behavior
+ if (deleteBehavior.equals("disable")) {
+ // For 'disable', the delete operation is not supported, so we
expect an exception
+ assertThatThrownBy(tableResult::await)
+ .hasStackTraceContaining(
+ "DeletionDisabledException: Delete operations are
disabled for this table."
+ + " The table.delete.behavior is set to
'disable'.");
+ } else {
+ // For 'ignore', the delete operation is ignored, so we just wait
for the insert and
+ // update to be applied
+ tableResult.await();
+ CloseableIterator<Row> rowIter =
+ tEnv.executeSql(String.format("select * from %s",
tableName)).collect();
+
+ final List<String> expectedRows;
+ if (deleteBehavior.equals("ignore")) {
+ // Row with a=1 should still exist (delete was ignored)
+ expectedRows =
+ Arrays.asList(
+ "+I[1, test1]", // Delete was ignored
+ "+I[2, test2]",
+ "-U[2, test2]",
+ "+U[2, updated_test2]",
+ "+I[3, test3]",
+ "+I[4, test4]");
+ } else {
+ // For 'allow', the delete operation should be reflected in
the final state
+ expectedRows =
+ Arrays.asList(
+ "+I[1, test1]",
+ "-D[1, test1]", // a=1 was deleted
+ "+I[2, test2]",
+ "-U[2, test2]",
+ "+U[2, updated_test2]",
+ "+I[3, test3]",
+ "+I[4, test4]");
+ }
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+ }
+ }
}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 3b899baee..4b49a5847 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -25,6 +25,7 @@ import org.apache.fluss.exception.CorruptRecordException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
+import org.apache.fluss.exception.DeletionDisabledException;
import org.apache.fluss.exception.DuplicateSequenceException;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FencedTieringEpochException;
@@ -225,7 +226,9 @@ public enum Errors {
"The new ISR contains at least one ineligible replica.",
IneligibleReplicaException::new),
INVALID_ALTER_TABLE_EXCEPTION(
- 56, "The alter table is invalid.",
InvalidAlterTableException::new);
+ 56, "The alter table is invalid.",
InvalidAlterTableException::new),
+ DELETION_DISABLED_EXCEPTION(
+ 57, "Deletion operations are disabled on this table.",
DeletionDisabledException::new);
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index cebc937bf..8fe9285bb 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -35,6 +35,8 @@ import
org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.DeleteBehavior;
+import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableChange;
@@ -404,6 +406,20 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
}
+ // For tables with first_row or versioned merge engines, automatically
set to IGNORE if
+ // delete behavior is not set
+ Configuration tableConf =
Configuration.fromMap(tableDescriptor.getProperties());
+ MergeEngineType mergeEngine =
+
tableConf.getOptional(ConfigOptions.TABLE_MERGE_ENGINE).orElse(null);
+ if (mergeEngine == MergeEngineType.FIRST_ROW || mergeEngine ==
MergeEngineType.VERSIONED) {
+ if (tableDescriptor.hasPrimaryKey()
+ &&
!tableConf.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR).isPresent()) {
+ Map<String, String> newProperties = new
HashMap<>(newDescriptor.getProperties());
+ newProperties.put(
+ ConfigOptions.TABLE_DELETE_BEHAVIOR.key(),
DeleteBehavior.IGNORE.name());
+ newDescriptor = newDescriptor.withProperties(newProperties);
+ }
+ }
return newDescriptor;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0437053af..f91d5cc59 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -21,8 +21,10 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.compression.ArrowCompressionInfo;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.DeletionDisabledException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -277,9 +279,14 @@ public final class KvTablet {
byte[] keyBytes =
BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key =
KvPreWriteBuffer.Key.of(keyBytes);
if (kvRecord.getRow() == null) {
- if (!rowMerger.supportsDelete()) {
+ DeleteBehavior deleteBehavior =
rowMerger.deleteBehavior();
+ if (deleteBehavior == DeleteBehavior.IGNORE) {
// skip delete rows if the merger doesn't
support yet
continue;
+ } else if (deleteBehavior ==
DeleteBehavior.DISABLE) {
+ throw new DeletionDisabledException(
+ "Delete operations are disabled
for this table. "
+ + "The
table.delete.behavior is set to 'disable'.");
}
// it's for deletion
byte[] oldValue = getFromBufferOrKv(key);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
index 4cce34d09..049d2859a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.kv.rowmerger;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.row.BinaryRow;
@@ -34,10 +35,14 @@ public class DefaultRowMerger implements RowMerger {
private final PartialUpdaterCache partialUpdaterCache;
private final KvFormat kvFormat;
private final Schema schema;
+ private final DeleteBehavior deleteBehavior;
- public DefaultRowMerger(Schema schema, KvFormat kvFormat) {
+ public DefaultRowMerger(
+ Schema schema, KvFormat kvFormat, @Nullable DeleteBehavior
deleteBehavior) {
this.schema = schema;
this.kvFormat = kvFormat;
+ // for compatibility, default to ALLOW if not specified
+ this.deleteBehavior = deleteBehavior != null ? deleteBehavior :
DeleteBehavior.ALLOW;
// TODO: share cache in server level when PartialUpdater is thread-safe
this.partialUpdaterCache = new PartialUpdaterCache();
}
@@ -57,8 +62,8 @@ public class DefaultRowMerger implements RowMerger {
}
@Override
- public boolean supportsDelete() {
- return true;
+ public DeleteBehavior deleteBehavior() {
+ return deleteBehavior;
}
@Override
@@ -69,7 +74,7 @@ public class DefaultRowMerger implements RowMerger {
// this also sanity checks the validity of the partial update
PartialUpdater partialUpdater =
partialUpdaterCache.getOrCreatePartialUpdater(kvFormat,
schema, targetColumns);
- return new PartialUpdateRowMerger(partialUpdater);
+ return new PartialUpdateRowMerger(partialUpdater, deleteBehavior);
}
}
@@ -77,9 +82,12 @@ public class DefaultRowMerger implements RowMerger {
private static class PartialUpdateRowMerger implements RowMerger {
private final PartialUpdater partialUpdater;
+ private final DeleteBehavior deleteBehavior;
- public PartialUpdateRowMerger(PartialUpdater partialUpdater) {
+ public PartialUpdateRowMerger(
+ PartialUpdater partialUpdater, DeleteBehavior deleteBehavior) {
this.partialUpdater = partialUpdater;
+ this.deleteBehavior = deleteBehavior;
}
@Override
@@ -101,8 +109,8 @@ public class DefaultRowMerger implements RowMerger {
}
@Override
- public boolean supportsDelete() {
- return true;
+ public DeleteBehavior deleteBehavior() {
+ return deleteBehavior;
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
index 502552af3..3271f0920 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.kv.rowmerger;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.row.BinaryRow;
@@ -29,6 +30,17 @@ import javax.annotation.Nullable;
*/
public class FirstRowRowMerger implements RowMerger {
+ private final DeleteBehavior deleteBehavior;
+
+ public FirstRowRowMerger(@Nullable DeleteBehavior deleteBehavior) {
+ if (deleteBehavior == DeleteBehavior.ALLOW) {
+ throw new IllegalArgumentException(
+ "DELETE is not supported for the first_row merge engine.");
+ }
+ // for compatibility, default to IGNORE if not specified
+ this.deleteBehavior = deleteBehavior != null ? deleteBehavior :
DeleteBehavior.IGNORE;
+ }
+
@Nullable
@Override
public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
@@ -44,8 +56,8 @@ public class FirstRowRowMerger implements RowMerger {
}
@Override
- public boolean supportsDelete() {
- return false;
+ public DeleteBehavior deleteBehavior() {
+ return deleteBehavior;
}
@Override
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
index 402ff2974..b942cf181 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.Schema;
@@ -44,7 +45,8 @@ public interface RowMerger {
/**
* Merge the old row with a delete row.
*
- * <p>This method will be invoked only when {@link #supportsDelete()}
returns true.
+ * <p>This method will be invoked only when {@link #deleteBehavior()}
returns {@link
+ * DeleteBehavior#ALLOW}.
*
* @param oldRow the old row.
* @return the merged row, or null if the row is deleted.
@@ -53,11 +55,11 @@ public interface RowMerger {
BinaryRow delete(BinaryRow oldRow);
/**
- * Whether the merger supports to merge delete rows.
+ * The behavior of delete operations on primary key tables.
*
- * @return true if the merger supports delete operation.
+ * @return {@link DeleteBehavior}
*/
- boolean supportsDelete();
+ DeleteBehavior deleteBehavior();
/** Dynamically configure the target columns to merge and return the
effective merger. */
RowMerger configureTargetColumns(@Nullable int[] targetColumns);
@@ -65,10 +67,12 @@ public interface RowMerger {
/** Create a row merger based on the given configuration. */
static RowMerger create(TableConfig tableConf, Schema schema, KvFormat
kvFormat) {
Optional<MergeEngineType> mergeEngineType =
tableConf.getMergeEngineType();
+ @Nullable DeleteBehavior deleteBehavior =
tableConf.getDeleteBehavior().orElse(null);
+
if (mergeEngineType.isPresent()) {
switch (mergeEngineType.get()) {
case FIRST_ROW:
- return new FirstRowRowMerger();
+ return new FirstRowRowMerger(deleteBehavior);
case VERSIONED:
Optional<String> versionColumn =
tableConf.getMergeEngineVersionColumn();
if (!versionColumn.isPresent()) {
@@ -77,13 +81,14 @@ public interface RowMerger {
"'%s' must be set for versioned merge
engine.",
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
}
- return new VersionedRowMerger(schema.getRowType(),
versionColumn.get());
+ return new VersionedRowMerger(
+ schema.getRowType(), versionColumn.get(),
deleteBehavior);
default:
throw new IllegalArgumentException(
"Unsupported merge engine type: " +
mergeEngineType.get());
}
} else {
- return new DefaultRowMerger(schema, kvFormat);
+ return new DefaultRowMerger(schema, kvFormat, deleteBehavior);
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
index 91ae0fd09..11901b34b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.kv.rowmerger;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.TimestampLtz;
@@ -46,8 +47,17 @@ public class VersionedRowMerger implements RowMerger {
private final Comparator<BinaryRow> versionComparator;
- public VersionedRowMerger(RowType schema, String versionColumnName) {
+ private final DeleteBehavior deleteBehavior;
+
+ public VersionedRowMerger(
+ RowType schema, String versionColumnName, @Nullable DeleteBehavior
deleteBehavior) {
this.versionComparator = createVersionComparator(schema,
versionColumnName);
+ if (deleteBehavior == DeleteBehavior.ALLOW) {
+ throw new IllegalArgumentException(
+ "DELETE is not supported for the versioned merge engine.");
+ }
+ // for compatibility, default to IGNORE if not specified
+ this.deleteBehavior = deleteBehavior != null ? deleteBehavior :
DeleteBehavior.IGNORE;
}
@Nullable
@@ -65,8 +75,8 @@ public class VersionedRowMerger implements RowMerger {
}
@Override
- public boolean supportsDelete() {
- return false;
+ public DeleteBehavior deleteBehavior() {
+ return deleteBehavior;
}
@Override
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 76e8adc82..446e8b505 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -26,6 +26,7 @@ import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TooManyBucketsException;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.MergeEngineType;
@@ -100,6 +101,7 @@ public class TableDescriptorValidation {
checkLogFormat(tableConf, hasPrimaryKey);
checkArrowCompression(tableConf);
checkMergeEngine(tableConf, hasPrimaryKey, schema);
+ checkDeleteBehavior(tableConf, hasPrimaryKey);
checkTieredLog(tableConf);
checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema);
checkSystemColumns(schema);
@@ -316,6 +318,30 @@ public class TableDescriptorValidation {
}
}
+ private static void checkDeleteBehavior(Configuration tableConf, boolean
hasPrimaryKey) {
+ Optional<DeleteBehavior> deleteBehaviorOptional =
+ tableConf.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
+ if (!hasPrimaryKey && deleteBehaviorOptional.isPresent()) {
+ throw new InvalidConfigException(
+ "The 'table.delete.behavior' configuration is only
supported for primary key tables.");
+ }
+
+ // For tables with merge engines, automatically set appropriate delete
behavior
+ MergeEngineType mergeEngine =
tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
+ if (mergeEngine == MergeEngineType.FIRST_ROW || mergeEngine ==
MergeEngineType.VERSIONED) {
+ // For FIRST_ROW and VERSIONED merge engines, delete operations
are not supported
+ // If user explicitly sets delete behavior to ALLOW, throw an
exception
+ if (deleteBehaviorOptional.isPresent()
+ && deleteBehaviorOptional.get() == DeleteBehavior.ALLOW) {
+ throw new InvalidConfigException(
+ String.format(
+ "Table with '%s' merge engine does not support
delete operations. "
+ + "The 'table.delete.behavior' config
must be set to 'ignore' or 'disable', but got 'allow'.",
+ mergeEngine));
+ }
+ }
+ }
+
private static void validateOptionValue(ReadableConfig options,
ConfigOption<?> option) {
try {
options.get(option);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
new file mode 100644
index 000000000..a7eea5fa9
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rowmerger;
+
+import org.apache.fluss.metadata.DeleteBehavior;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultRowMerger} delete behavior functionality. */
+class DefaultRowMergerTest {
+
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ private static final RowType ROW_TYPE = SCHEMA.getRowType();
+
+ private BinaryRow createBinaryRow(int id, String name) {
+ return compactedRow(ROW_TYPE, new Object[] {id, name});
+ }
+
+ @ParameterizedTest
+ @EnumSource(DeleteBehavior.class)
+ void testDefaultRowMerger(DeleteBehavior deleteBehavior) {
+ DefaultRowMerger merger = new DefaultRowMerger(SCHEMA,
KvFormat.COMPACTED, deleteBehavior);
+
+ BinaryRow oldRow = createBinaryRow(1, "old");
+ BinaryRow newRow = createBinaryRow(1, "new");
+
+ // Test merge operation - should return new row
+ BinaryRow mergedRow = merger.merge(oldRow, newRow);
+ assertThat(mergedRow).isSameAs(newRow);
+
+ // Test delete operation - should return null (deleted)
+ BinaryRow deletedRow = merger.delete(oldRow);
+ assertThat(deletedRow).isNull();
+
+ // Test supportsDelete - should return true
+ assertThat(merger.deleteBehavior()).isEqualTo(deleteBehavior);
+ }
+
+ @ParameterizedTest
+ @EnumSource(DeleteBehavior.class)
+ void testPartialUpdateRowMergerDeleteBehavior(DeleteBehavior
deleteBehavior) {
+ DefaultRowMerger merger = new DefaultRowMerger(SCHEMA,
KvFormat.COMPACTED, deleteBehavior);
+
+ // Configure for partial update (only name column)
+ RowMerger partialMerger = merger.configureTargetColumns(new int[] {0,
1}); // id + name
+
+ BinaryRow oldRow = createBinaryRow(1, "old");
+
+ BinaryRow ignoredRow = partialMerger.delete(oldRow);
+ assertThat(ignoredRow).isNull();
+ assertThat(partialMerger.deleteBehavior()).isEqualTo(deleteBehavior);
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
index 989af9385..45166fd0b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.kv.rowmerger;
+import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.TimestampLtz;
@@ -128,7 +129,7 @@ class VersionedRowMergerTest {
.column("b", DataTypes.STRING())
.build()
.getRowType();
- VersionedRowMerger merger = new VersionedRowMerger(schema, "a");
+ VersionedRowMerger merger = new VersionedRowMerger(schema, "a",
DeleteBehavior.DISABLE);
for (TestSpec testSpec : testSpecs) {
BinaryRow oldRow = compactedRow(schema, new Object[]
{testSpec.oldValue, "dummy"});
@@ -152,9 +153,9 @@ class VersionedRowMergerTest {
.column("b", DataTypes.STRING())
.build()
.getRowType();
- VersionedRowMerger merger = new VersionedRowMerger(schema, "a");
+ VersionedRowMerger merger = new VersionedRowMerger(schema, "a",
DeleteBehavior.DISABLE);
- assertThat(merger.supportsDelete()).isFalse();
+ assertThat(merger.deleteBehavior()).isEqualTo(DeleteBehavior.DISABLE);
assertThat(merger.configureTargetColumns(null)).isSameAs(merger);
}
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index d53a468e5..9c71e7077 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -83,6 +83,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
| table.datalake.auto-compaction | Boolean | false
| If true, compaction will be triggered automatically when tiering
service writes to the datalake. It is disabled by default.
[...]
| table.merge-engine | Enum | (None)
| Defines the merge engine for the primary key table. By default,
primary key table uses the [default merge
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md).
It also supports two merge engines are `first_row` and `versioned`. The
[first_row merge
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep
the first row of the same primary key. The [v [...]
| table.merge-engine.versioned.ver-column | String | (None)
| The column name of the version column for the `versioned` merge
engine. If the merge engine is set to `versioned`, the version column must be
set.
[...]
+| table.delete.behavior | Enum | ALLOW
| Controls the behavior of delete operations on primary key tables.
Three modes are supported: `ALLOW` (default) - allows normal delete operations;
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects
delete requests and throws explicit errors. This configuration provides
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join)
that must not receive [...]
## Read Options