liting liu created FLINK-39797:
----------------------------------

             Summary: [Flink CDC][Fluss] Support disabling automatic table 
creation and fail when target table does not exist
                 Key: FLINK-39797
                 URL: https://issues.apache.org/jira/browse/FLINK-39797
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
         Environment: Flink CDC pipeline connector: Fluss sink
Observed in release-3.5 code path and current master behavior around 
FlussMetaDataApplier.applyCreateTable.
            Reporter: liting liu


h2. Motivation

The Fluss pipeline sink currently always creates the target Fluss 
database/table when it receives a CreateTableEvent and the target table does 
not exist.

In some production environments, target Fluss tables are expected to be managed 
explicitly by the platform or by users. The CDC job should not silently create 
target tables because table properties, bucket keys, primary keys, and other 
Fluss-specific layout choices may need to be reviewed before the job starts 
writing data.

This is similar to the motivation in FLINK-37677, but the desired behavior here 
is stricter for Fluss: when automatic table creation is disabled and the target 
table does not exist, the job should fail with a clear error instead of 
creating the table.

h2. Current behavior

In the Fluss pipeline sink, FlussMetaDataApplier.applyCreateTable currently 
does roughly the following:

{code:java}
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, 
true);
if (!admin.tableExists(tablePath).get()) {
    admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
    TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
    sanityCheck(inferredFlussTable, currentTableInfo);
}
{code}

As a result, a missing target Fluss table is automatically created. There is no 
Fluss sink option to turn this behavior off.

Using schema.change.behavior: exception does not solve this. CreateTableEvent 
is required by the CDC schema pipeline to initialize internal schema state, and 
it may still be applied to the sink.

h2. Expected behavior

Add a Fluss sink option to control automatic target table creation, for example:

{code:yaml}
sink:
  type: fluss
  bootstrap.servers: localhost:9123
  auto.create.table: false
{code}

When auto.create.table is false:

* Flink CDC should still process CreateTableEvent internally so that 
SchemaRegistry / SchemaOperator can maintain the evolved schema.
* Fluss sink should not create the target database/table automatically.
* If the target Fluss table does not exist, the job should fail with a clear 
ValidationException, for example:

{code}
Target Fluss table <database>.<table> does not exist and auto.create.table is 
false.
{code}

If the target table exists, the existing sanityCheck should still run to 
validate primary keys, bucket keys, and partition keys.

h2. Proposed implementation

Add a FlussDataSink option such as auto.create.table (default true for backward 
compatibility), pass it into FlussMetaDataApplier, and update applyCreateTable:

{code:java}
if (!admin.tableExists(tablePath).get()) {
    if (!autoCreateTable) {
        throw new ValidationException(
            "Target Fluss table " + tablePath + " does not exist and 
auto.create.table is false.");
    }
    admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
    TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
    sanityCheck(inferredFlussTable, currentTableInfo);
}
{code}

h2. Related issues

* FLINK-37677: Sink support skip create table event
* PR #4015 proposed skipping applying CreateTableEvent to the external system 
while keeping it in the CDC internal schema state. That PR was closed as stale 
and also raised a schema consistency concern.
* FLINK-37837 highlights that CreateTableEvent is foundational for subsequent 
schema handling and should not simply be dropped from CDC internal processing.

h2. Compatibility

The default should remain current behavior, auto.create.table: true, to avoid 
breaking existing Fluss CDC pipeline users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to