[ 
https://issues.apache.org/jira/browse/FLINK-39088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gustavo de Morais updated FLINK-39088:
--------------------------------------
    Description: 
Currently, when a CAST operation is applied to a primary key or upsert key 
column in a streaming query, Flink's optimizer loses track of the key, in some 
cases, even when the cast is provably injective (one-to-one mapping). This 
causes the planner to insert a SinkUpsertMaterializer operator unnecessarily, 
adding state overhead and reducing performance. For example:

CREATE TABLE source (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING
);
 
CREATE TABLE sink (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING  
);
 
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
 
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to 
a distinct string representation. The upsert key should be preserved through 
this cast. However, the current implementation treats any cast as potentially 
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key 
before writing to the sink.Solution:Extend the key-tracking logic in 
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations. 
An explicit cast is considered injective when every distinct input value maps 
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be 
recognized as injective:
 - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)

The following can also be added but are up to discussion since they're more 
rarely used as unique keys
 - Floating point types: FLOAT, DOUBLE
 - BOOLEAN
 - DATE
 - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ

 

*Also, we currently are used isImplicitCasts to check if the upsert key should 
be preserved.* This is not correct. There are multiple cases of implicit casts 
that we support that are not necessarily injective. E.g. char(6) to char(3). 
We'll fix this by properly implementing all the supported injective casts.

  was:
Currently, when a CAST operation is applied to a primary key or upsert key 
column in a streaming query, Flink's optimizer loses track of the key, in some 
cases, even when the cast is provably injective (one-to-one mapping). This 
causes the planner to insert a SinkUpsertMaterializer operator unnecessarily, 
adding state overhead and reducing performance. For example:

CREATE TABLE source (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING
);
 
CREATE TABLE sink (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING  
);
 
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
 
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to 
a distinct string representation. The upsert key should be preserved through 
this cast. However, the current implementation treats any cast as potentially 
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key 
before writing to the sink.Solution:Extend the key-tracking logic in 
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations. 
An explicit cast is considered injective when every distinct input value maps 
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be 
recognized as injective:
 - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)

The following can also be added but are up to discussion since they're more 
rarely used as unique keys
 - Floating point types: FLOAT, DOUBLE
 - BOOLEAN
 - DATE
 - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ


> Preserve upsert keys for explicit (injective) CAST calls to avoid unnecessary 
> SinkUpsertMaterializer
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39088
>                 URL: https://issues.apache.org/jira/browse/FLINK-39088
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>            Reporter: Gustavo de Morais
>            Assignee: Gustavo de Morais
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> Currently, when a CAST operation is applied to a primary key or upsert key 
> column in a streaming query, Flink's optimizer loses track of the key, in 
> some cases, even when the cast is provably injective (one-to-one mapping). 
> This causes the planner to insert a SinkUpsertMaterializer operator 
> unnecessarily, adding state overhead and reducing performance. For example:
> CREATE TABLE source (
>   id INT PRIMARY KEY NOT ENFORCED,
>   name STRING
> );
>  
> CREATE TABLE sink (
>   id STRING PRIMARY KEY NOT ENFORCED,
>   name STRING  
> );
>  
> INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
>  
> In this case, CAST(INT AS STRING) is injective - every distinct integer maps 
> to a distinct string representation. The upsert key should be preserved 
> through this cast. However, the current implementation treats any cast as 
> potentially key-destroying, requiring a SinkUpsertMaterializer to 
> re-establish the key before writing to the sink.Solution:Extend the 
> key-tracking logic in FlinkRelMdUniqueKeys to recognize injective casts as 
> key-preserving operations. An explicit cast is considered injective when 
> every distinct input value maps to a distinct output value.The following 
> explicit casts to VARCHAR/CHAR will be recognized as injective:
>  - Integer types: TINYINT, SMALLINT, INTEGER, BIGINT (biggest win)
> The following can also be added but are up to discussion since they're more 
> rarely used as unique keys
>  - Floating point types: FLOAT, DOUBLE
>  - BOOLEAN
>  - DATE
>  - Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ
>  
> *Also, we currently are used isImplicitCasts to check if the upsert key 
> should be preserved.* This is not correct. There are multiple cases of 
> implicit casts that we support that are not necessarily injective. E.g. 
> char(6) to char(3). We'll fix this by properly implementing all the supported 
> injective casts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to