[ 
https://issues.apache.org/jira/browse/FLINK-38624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18047930#comment-18047930
 ] 

EMERSON WANG commented on FLINK-38624:
--------------------------------------

Thank you for your suggestion. All simple SQL jobs work fine except for this 
complicated one. Even after removing a dozen columns from the final SQL, we 
were unable to reproduce the error. We have already replaced Kafka with datagen 
and print. Unfortunately, we have not been able to further simplify the script 
while still reproducing the issue.

> Flink 2.1 Job Failure: Type Mismatch Exception in 
> StreamPhysicalOverAggregateRule (TIMESTAMP(3) vs TIMESTAMP(3) ROWTIME)
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38624
>                 URL: https://issues.apache.org/jira/browse/FLINK-38624
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: EMERSON WANG
>            Priority: Major
>         Attachments: flink_example.py, test_data.csv
>
>
> Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
> with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
> following error:
> java.lang.RuntimeException: Error while applying rule 
> StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
> The job fails during execution of a statement set, with a type mismatch 
> involving the mod_record_ts field. The error appears to be related to the 
> planner's handling of time attributes in Table SQL jobs.
> The table field mod_record_ts was used only for watermark in the following 
> table:
> CREATE TABLE mod_code_pt (
> id INT,
> mod_code_pt STRING,
> waveform STRING,
> snr double,
> bits_per_sym double,
> direction STRING,
> spread_factor INT,
> record_ts STRING,
> dt STRING,
> ts_year INT,
> ts_month INT,
> ts_day INT,
> ts_hr INT,
> mod_record_ts as TO_TIMESTAMP(record_ts),
> PRIMARY KEY (mod_code_pt) NOT ENFORCED,
> WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
> ) WITH (
> 'connector'='filesystem',
> 'path'='gs://<path>',
> 'format'='parquet',
> 'source.monitor-interval'='1 d'
> )
> Steps to Reproduce:
> Deploy PyFlink Table API job on GKE with Flink 2.1.0
> Use Table definitions similar to those in the log (see CREATE TABLE 
> statements).
> Run the job; observe failure with type mismatch in planner.
> Expected Behavior:
> Job should execute successfully as in Flink 1.19.0/1.20.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to