[ 
https://issues.apache.org/jira/browse/FLINK-38624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-38624:
------------------------------------
    Description: 
Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
following error:
{noformat}
java.lang.RuntimeException: Error while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
{noformat}
The job fails during execution of a statement set, with a type mismatch 
involving the mod_record_ts field. The error appears to be related to the 
planner's handling of time attributes in Table SQL jobs.

The table field mod_record_ts was used only for watermark in the following 
table:
{code:sql}
CREATE TABLE mod_code_pt (
id INT,
mod_code_pt STRING,
waveform STRING,
snr double,
bits_per_sym double,
direction STRING,
spread_factor INT,
record_ts STRING,
dt STRING,
ts_year INT,
ts_month INT,
ts_day INT,
ts_hr INT,
mod_record_ts as TO_TIMESTAMP(record_ts),
PRIMARY KEY (mod_code_pt) NOT ENFORCED,
WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
) WITH (
'connector'='filesystem',
'path'='gs://<path>',
'format'='parquet',
'source.monitor-interval'='1 d'
)
{code}
Steps to Reproduce:
Deploy PyFlink Table API job on GKE with Flink 2.1.0
Use Table definitions similar to those in the log (see CREATE TABLE statements).
Run the job; observe failure with type mismatch in planner.

Expected Behavior:
Job should execute successfully as in Flink 1.19.0/1.20.0

Updated:
junit version to reproduce
{code:scala}
 @Test
  def testTemporalJoinWithWatermarks(): Unit = {
    util.addTable(s"""
                     |CREATE TABLE orders (
                     |  product_id STRING,
                     |  amount BIGINT,
                     |  order_ts TIMESTAMP(3),
                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
                     |) WITH (
                     |  'connector' = 'values'
                     |)
                     |""".stripMargin)

    util.addTable(s"""
                     |CREATE TABLE products (
                     |  product_id STRING,
                     |  record_ts STRING,
                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
                     |  PRIMARY KEY (product_id) NOT ENFORCED,
                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL 
'60' SECONDS
                     |) WITH (
                     |  'connector' = 'values'
                     |)
                     |""".stripMargin)

    util.verifyExecPlan(s"""
                           |SELECT count(o.amount) OVER (PARTITION BY 
o.product_id) AS prev_amount
                           |FROM orders AS o
                           |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts 
AS p
                           |ON o.product_id = p.product_id
                           |""".stripMargin)
  }
{code}

  was:
Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
following error:
{noformat}
java.lang.RuntimeException: Error while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
{noformat}
The job fails during execution of a statement set, with a type mismatch 
involving the mod_record_ts field. The error appears to be related to the 
planner's handling of time attributes in Table SQL jobs.

The table field mod_record_ts was used only for watermark in the following 
table:
{code:sql}
CREATE TABLE mod_code_pt (
id INT,
mod_code_pt STRING,
waveform STRING,
snr double,
bits_per_sym double,
direction STRING,
spread_factor INT,
record_ts STRING,
dt STRING,
ts_year INT,
ts_month INT,
ts_day INT,
ts_hr INT,
mod_record_ts as TO_TIMESTAMP(record_ts),
PRIMARY KEY (mod_code_pt) NOT ENFORCED,
WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
) WITH (
'connector'='filesystem',
'path'='gs://<path>',
'format'='parquet',
'source.monitor-interval'='1 d'
)
{code}
Steps to Reproduce:
Deploy PyFlink Table API job on GKE with Flink 2.1.0
Use Table definitions similar to those in the log (see CREATE TABLE statements).
Run the job; observe failure with type mismatch in planner.

Expected Behavior:
Job should execute successfully as in Flink 1.19.0/1.20.0


> Flink 2.1 Job Failure: Type Mismatch Exception in 
> StreamPhysicalOverAggregateRule (TIMESTAMP(3) vs TIMESTAMP(3) ROWTIME)
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38624
>                 URL: https://issues.apache.org/jira/browse/FLINK-38624
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: EMERSON WANG
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink_example.py
>
>
> Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
> with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
> following error:
> {noformat}
> java.lang.RuntimeException: Error while applying rule 
> StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
> {noformat}
> The job fails during execution of a statement set, with a type mismatch 
> involving the mod_record_ts field. The error appears to be related to the 
> planner's handling of time attributes in Table SQL jobs.
> The table field mod_record_ts was used only for watermark in the following 
> table:
> {code:sql}
> CREATE TABLE mod_code_pt (
> id INT,
> mod_code_pt STRING,
> waveform STRING,
> snr double,
> bits_per_sym double,
> direction STRING,
> spread_factor INT,
> record_ts STRING,
> dt STRING,
> ts_year INT,
> ts_month INT,
> ts_day INT,
> ts_hr INT,
> mod_record_ts as TO_TIMESTAMP(record_ts),
> PRIMARY KEY (mod_code_pt) NOT ENFORCED,
> WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
> ) WITH (
> 'connector'='filesystem',
> 'path'='gs://<path>',
> 'format'='parquet',
> 'source.monitor-interval'='1 d'
> )
> {code}
> Steps to Reproduce:
> Deploy PyFlink Table API job on GKE with Flink 2.1.0
> Use Table definitions similar to those in the log (see CREATE TABLE 
> statements).
> Run the job; observe failure with type mismatch in planner.
> Expected Behavior:
> Job should execute successfully as in Flink 1.19.0/1.20.0
> Updated:
> junit version to reproduce
> {code:scala}
>  @Test
>   def testTemporalJoinWithWatermarks(): Unit = {
>     util.addTable(s"""
>                      |CREATE TABLE orders (
>                      |  product_id STRING,
>                      |  amount BIGINT,
>                      |  order_ts TIMESTAMP(3),
>                      |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
> SECONDS
>                      |) WITH (
>                      |  'connector' = 'values'
>                      |)
>                      |""".stripMargin)
>     util.addTable(s"""
>                      |CREATE TABLE products (
>                      |  product_id STRING,
>                      |  record_ts STRING,
>                      |  mod_record_ts AS TO_TIMESTAMP(record_ts),
>                      |  PRIMARY KEY (product_id) NOT ENFORCED,
>                      |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
> INTERVAL '60' SECONDS
>                      |) WITH (
>                      |  'connector' = 'values'
>                      |)
>                      |""".stripMargin)
>     util.verifyExecPlan(s"""
>                            |SELECT count(o.amount) OVER (PARTITION BY 
> o.product_id) AS prev_amount
>                            |FROM orders AS o
>                            |LEFT JOIN products FOR SYSTEM_TIME AS OF 
> o.order_ts AS p
>                            |ON o.product_id = p.product_id
>                            |""".stripMargin)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to