Gustavo de Morais created FLINK-39088:
-----------------------------------------

             Summary: Preserve upsert keys for explicit (injective) CAST calls 
to avoid unnecessary SinkUpsertMaterializer
                 Key: FLINK-39088
                 URL: https://issues.apache.org/jira/browse/FLINK-39088
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
    Affects Versions: 2.2.0
            Reporter: Gustavo de Morais
            Assignee: Gustavo de Morais
             Fix For: 2.3.0


Currently, when a CAST operation is applied to a primary key or upsert key 
column in a streaming query, Flink's optimizer loses track of the key even when 
the cast is provably injective (one-to-one mapping). This causes the planner to 
insert a SinkUpsertMaterializer operator unnecessarily, adding state overhead 
and reducing performance.For example:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
CREATE TABLE source (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING
) WITH ('connector' = 'kafka', ...);
 
CREATE TABLE sink (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING  
) WITH ('connector' = 'jdbc', ...);
 
INSERT INTO sink SELECT CAST(id AS STRING), name FROM source;
 
 
 
 
 
 
 
 
 
In this case, CAST(INT AS STRING) is injective - every distinct integer maps to 
a distinct string representation. The upsert key should be preserved through 
this cast. However, the current implementation treats any cast as potentially 
key-destroying, requiring a SinkUpsertMaterializer to re-establish the key 
before writing to the sink.Solution:Extend the key-tracking logic in 
FlinkRelMdUniqueKeys to recognize injective casts as key-preserving operations. 
An explicit cast is considered injective when every distinct input value maps 
to a distinct output value.The following explicit casts to VARCHAR/CHAR will be 
recognized as injective: * Integer types: TINYINT, SMALLINT, INTEGER, BIGINT

 * Floating point types: FLOAT, DOUBLE

 * BOOLEAN

 * DATE

 * Timestamp types: TIMESTAMP, TIMESTAMP_LTZ, TIMESTAMP_TZ



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to