[ 
https://issues.apache.org/jira/browse/FLINK-39797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39797:
-----------------------------------
    Labels: Flink-CDC fluss pull-request-available  (was: Flink-CDC fluss)

> [Flink CDC][Fluss] Support disabling automatic table creation and fail when 
> target table does not exist
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39797
>                 URL: https://issues.apache.org/jira/browse/FLINK-39797
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>         Environment: Flink CDC pipeline connector: Fluss sink
> Observed in release-3.5 code path and current master behavior around 
> FlussMetaDataApplier.applyCreateTable.
>            Reporter: liting liu
>            Priority: Major
>              Labels: Flink-CDC, fluss, pull-request-available
>
> h2. Motivation
> The Fluss pipeline sink currently always creates the target Fluss 
> database/table when it receives a CreateTableEvent and the target table does 
> not exist.
> In some production environments, target Fluss tables are expected to be 
> managed explicitly by the platform or by users. The CDC job should not 
> silently create target tables because table properties, bucket keys, primary 
> keys, and other Fluss-specific layout choices may need to be reviewed before 
> the job starts writing data.
> This is similar to the motivation in FLINK-37677, but the desired behavior 
> here is stricter for Fluss: when automatic table creation is disabled and the 
> target table does not exist, the job should fail with a clear error instead 
> of creating the table.
> h2. Current behavior
> In the Fluss pipeline sink, FlussMetaDataApplier.applyCreateTable currently 
> does roughly the following:
> {code:java}
> admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, 
> true);
> if (!admin.tableExists(tablePath).get()) {
>     admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
>     TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
>     sanityCheck(inferredFlussTable, currentTableInfo);
> }
> {code}
> As a result, a missing target Fluss table is automatically created. There is 
> no Fluss sink option to turn this behavior off.
> Using schema.change.behavior: exception does not solve this. CreateTableEvent 
> is required by the CDC schema pipeline to initialize internal schema state, 
> and it may still be applied to the sink.
> h2. Expected behavior
> Add a Fluss sink option to control automatic target table creation, for 
> example:
> {code:yaml}
> sink:
>   type: fluss
>   bootstrap.servers: localhost:9123
>   auto.create.table: false
> {code}
> When auto.create.table is false:
> * Flink CDC should still process CreateTableEvent internally so that 
> SchemaRegistry / SchemaOperator can maintain the evolved schema.
> * Fluss sink should not create the target database/table automatically.
> * If the target Fluss table does not exist, the job should fail with a clear 
> ValidationException, for example:
> {code}
> Target Fluss table <database>.<table> does not exist and auto.create.table is 
> false.
> {code}
> If the target table exists, the existing sanityCheck should still run to 
> validate primary keys, bucket keys, and partition keys.
> h2. Proposed implementation
> Add a FlussDataSink option such as auto.create.table (default true for 
> backward compatibility), pass it into FlussMetaDataApplier, and update 
> applyCreateTable:
> {code:java}
> if (!admin.tableExists(tablePath).get()) {
>     if (!autoCreateTable) {
>         throw new ValidationException(
>             "Target Fluss table " + tablePath + " does not exist and 
> auto.create.table is false.");
>     }
>     admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
>     TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
>     sanityCheck(inferredFlussTable, currentTableInfo);
> }
> {code}
> h2. Related issues
> * FLINK-37677: Sink support skip create table event
> * PR #4015 proposed skipping applying CreateTableEvent to the external system 
> while keeping it in the CDC internal schema state. That PR was closed as 
> stale and also raised a schema consistency concern.
> * FLINK-37837 highlights that CreateTableEvent is foundational for subsequent 
> schema handling and should not simply be dropped from CDC internal processing.
> h2. Compatibility
> The default should remain current behavior, auto.create.table: true, to avoid 
> breaking existing Fluss CDC pipeline users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to