[
https://issues.apache.org/jira/browse/FLINK-39797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39797:
-----------------------------------
Labels: Flink-CDC fluss pull-request-available (was: Flink-CDC fluss)
> [Flink CDC][Fluss] Support disabling automatic table creation and fail when
> target table does not exist
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39797
> URL: https://issues.apache.org/jira/browse/FLINK-39797
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Environment: Flink CDC pipeline connector: Fluss sink
> Observed in release-3.5 code path and current master behavior around
> FlussMetaDataApplier.applyCreateTable.
> Reporter: liting liu
> Priority: Major
> Labels: Flink-CDC, fluss, pull-request-available
>
> h2. Motivation
> The Fluss pipeline sink currently always creates the target Fluss
> database/table when it receives a CreateTableEvent and the target table does
> not exist.
> In some production environments, target Fluss tables are expected to be
> managed explicitly by the platform or by users. The CDC job should not
> silently create target tables because table properties, bucket keys, primary
> keys, and other Fluss-specific layout choices may need to be reviewed before
> the job starts writing data.
> This is similar to the motivation in FLINK-37677, but the desired behavior
> here is stricter for Fluss: when automatic table creation is disabled and the
> target table does not exist, the job should fail with a clear error instead
> of creating the table.
> h2. Current behavior
> In the Fluss pipeline sink, FlussMetaDataApplier.applyCreateTable currently
> does roughly the following:
> {code:java}
> admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY,
> true);
> if (!admin.tableExists(tablePath).get()) {
> admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
> TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
> sanityCheck(inferredFlussTable, currentTableInfo);
> }
> {code}
> As a result, a missing target Fluss table is automatically created. There is
> no Fluss sink option to turn this behavior off.
> Using schema.change.behavior: exception does not solve this. CreateTableEvent
> is required by the CDC schema pipeline to initialize internal schema state,
> and it may still be applied to the sink.
> h2. Expected behavior
> Add a Fluss sink option to control automatic target table creation, for
> example:
> {code:yaml}
> sink:
> type: fluss
> bootstrap.servers: localhost:9123
> auto.create.table: false
> {code}
> When auto.create.table is false:
> * Flink CDC should still process CreateTableEvent internally so that
> SchemaRegistry / SchemaOperator can maintain the evolved schema.
> * Fluss sink should not create the target database/table automatically.
> * If the target Fluss table does not exist, the job should fail with a clear
> ValidationException, for example:
> {code}
> Target Fluss table <database>.<table> does not exist and auto.create.table is
> false.
> {code}
> If the target table exists, the existing sanityCheck should still run to
> validate primary keys, bucket keys, and partition keys.
> h2. Proposed implementation
> Add a FlussDataSink option such as auto.create.table (default true for
> backward compatibility), pass it into FlussMetaDataApplier, and update
> applyCreateTable:
> {code:java}
> if (!admin.tableExists(tablePath).get()) {
> if (!autoCreateTable) {
> throw new ValidationException(
> "Target Fluss table " + tablePath + " does not exist and
> auto.create.table is false.");
> }
> admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
> TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
> sanityCheck(inferredFlussTable, currentTableInfo);
> }
> {code}
> h2. Related issues
> * FLINK-37677: Sink support skip create table event
> * PR #4015 proposed skipping applying CreateTableEvent to the external system
> while keeping it in the CDC internal schema state. That PR was closed as
> stale and also raised a schema consistency concern.
> * FLINK-37837 highlights that CreateTableEvent is foundational for subsequent
> schema handling and should not simply be dropped from CDC internal processing.
> h2. Compatibility
> The default should remain current behavior, auto.create.table: true, to avoid
> breaking existing Fluss CDC pipeline users.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)