[
https://issues.apache.org/jira/browse/FLINK-39718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-39718:
----------------------------
Description:
When using distributed pipeline source(such as kafka, We've implemented a kafka
pipeline within the company, but this is a common/universal problem.) with
Paimon sink in distributed topology, the job may fail before the sink finishes
auto-creating the target table.
The failure happens in *DistributedPrePartitionOperator*. The previous
*PaimonHashFunction* rebuilds the hash function at this stage and immediately
accesses the external Paimon catalog to load the target table. However, in the
auto-created table case, the sink-side table creation has not happened yet, so
catalog.getTable(...) throws TableNotExistException and the job fails in
pre-partition stage.
This issue is usually not exposed in MySQL-CDC pipelines because MySQL-CDC uses
regular topology. In that path, CreateTableEvent is handled by SchemaOperator
first, and the sink-side MetadataApplier creates the downstream table before
records enter RegularPrePartitionOperator. As a result, the old
PaimonHashFunction can usually find the target table from catalog.
*Distributed pipeline source behaves differently because it uses distributed
topology, where pre-partition happens earlier than auto-created table in
MetadataApplier phase.*
Paimon or other pipeline sinks that support automatic table creation should
support distributed topologies, even if there isn't currently a connector for a
distributed pipeline source.
was:
When using distributed pipeline source(such as kafka, We've implemented a kafka
pipeline within the company, but this is a common/universal problem.) with
Paimon sink in distributed topology, the job may fail before the sink finishes
auto-creating the target table.
The failure happens in *DistributedPrePartitionOperator*. The previous
*PaimonHashFunction* rebuilds the hash function at this stage and immediately
accesses the external Paimon catalog to load the target table. However, in the
auto-created table case, the sink-side table creation has not happened yet, so
catalog.getTable(...) throws TableNotExistException and the job fails in
pre-partition stage.
This issue is usually not exposed in MySQL-CDC pipelines because MySQL-CDC uses
regular topology. In that path, CreateTableEvent is handled by SchemaOperator
first, and the sink-side MetadataApplier creates the downstream table before
records enter RegularPrePartitionOperator. As a result, the old
PaimonHashFunction can usually find the target table from catalog.
*Distributed pipeline source behaves differently because it uses distributed
topology, where pre-partition happens earlier than auto-created table in
MetadataApplier.*
Paimon or other pipeline sinks that support automatic table creation should
support distributed topologies, even if there isn't currently a connector for a
distributed pipeline source.
> [pipeline][paimon] Paimon pipeline sink fails with distributed source when
> target table does not exist
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39718
> URL: https://issues.apache.org/jira/browse/FLINK-39718
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Ran Tao
> Priority: Major
>
> When using distributed pipeline source(such as kafka, We've implemented a
> kafka pipeline within the company, but this is a common/universal problem.)
> with Paimon sink in distributed topology, the job may fail before the sink
> finishes auto-creating the target table.
> The failure happens in *DistributedPrePartitionOperator*. The previous
> *PaimonHashFunction* rebuilds the hash function at this stage and immediately
> accesses the external Paimon catalog to load the target table. However, in
> the auto-created table case, the sink-side table creation has not happened
> yet, so catalog.getTable(...) throws TableNotExistException and the job fails
> in pre-partition stage.
> This issue is usually not exposed in MySQL-CDC pipelines because MySQL-CDC
> uses regular topology. In that path, CreateTableEvent is handled by
> SchemaOperator first, and the sink-side MetadataApplier creates the
> downstream table before records enter RegularPrePartitionOperator. As a
> result, the old PaimonHashFunction can usually find the target table from
> catalog.
> *Distributed pipeline source behaves differently because it uses distributed
> topology, where pre-partition happens earlier than auto-created table in
> MetadataApplier phase.*
> Paimon or other pipeline sinks that support automatic table creation should
> support distributed topologies, even if there isn't currently a connector for
> a distributed pipeline source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)