liting liu created FLINK-39797:
----------------------------------
Summary: [Flink CDC][Fluss] Support disabling automatic table
creation and fail when target table does not exist
Key: FLINK-39797
URL: https://issues.apache.org/jira/browse/FLINK-39797
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Environment: Flink CDC pipeline connector: Fluss sink
Observed in release-3.5 code path and current master behavior around
FlussMetaDataApplier.applyCreateTable.
Reporter: liting liu
h2. Motivation
The Fluss pipeline sink currently always creates the target Fluss
database/table when it receives a CreateTableEvent and the target table does
not exist.
In some production environments, target Fluss tables are expected to be managed
explicitly by the platform or by users. The CDC job should not silently create
target tables because table properties, bucket keys, primary keys, and other
Fluss-specific layout choices may need to be reviewed before the job starts
writing data.
This is similar to the motivation in FLINK-37677, but the desired behavior here
is stricter for Fluss: when automatic table creation is disabled and the target
table does not exist, the job should fail with a clear error instead of
creating the table.
h2. Current behavior
In the Fluss pipeline sink, FlussMetaDataApplier.applyCreateTable currently
does roughly the following:
{code:java}
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY,
true);
if (!admin.tableExists(tablePath).get()) {
admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
sanityCheck(inferredFlussTable, currentTableInfo);
}
{code}
As a result, a missing target Fluss table is automatically created. There is no
Fluss sink option to turn this behavior off.
Using schema.change.behavior: exception does not solve this. CreateTableEvent
is required by the CDC schema pipeline to initialize internal schema state, and
it may still be applied to the sink.
h2. Expected behavior
Add a Fluss sink option to control automatic target table creation, for example:
{code:yaml}
sink:
type: fluss
bootstrap.servers: localhost:9123
auto.create.table: false
{code}
When auto.create.table is false:
* Flink CDC should still process CreateTableEvent internally so that
SchemaRegistry / SchemaOperator can maintain the evolved schema.
* Fluss sink should not create the target database/table automatically.
* If the target Fluss table does not exist, the job should fail with a clear
ValidationException, for example:
{code}
Target Fluss table <database>.<table> does not exist and auto.create.table is
false.
{code}
If the target table exists, the existing sanityCheck should still run to
validate primary keys, bucket keys, and partition keys.
h2. Proposed implementation
Add a FlussDataSink option such as auto.create.table (default true for backward
compatibility), pass it into FlussMetaDataApplier, and update applyCreateTable:
{code:java}
if (!admin.tableExists(tablePath).get()) {
if (!autoCreateTable) {
throw new ValidationException(
"Target Fluss table " + tablePath + " does not exist and
auto.create.table is false.");
}
admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
sanityCheck(inferredFlussTable, currentTableInfo);
}
{code}
h2. Related issues
* FLINK-37677: Sink support skip create table event
* PR #4015 proposed skipping applying CreateTableEvent to the external system
while keeping it in the CDC internal schema state. That PR was closed as stale
and also raised a schema consistency concern.
* FLINK-37837 highlights that CreateTableEvent is foundational for subsequent
schema handling and should not simply be dropped from CDC internal processing.
h2. Compatibility
The default should remain current behavior, auto.create.table: true, to avoid
breaking existing Fluss CDC pipeline users.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)