[
https://issues.apache.org/jira/browse/FLINK-29275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tally closed FLINK-29275.
-------------------------
Resolution: Later
> Temporal Table function: Cannot add expression of different type to set
> -----------------------------------------------------------------------
>
> Key: FLINK-29275
> URL: https://issues.apache.org/jira/browse/FLINK-29275
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.15.2
> Reporter: Tally
> Priority: Major
>
> I am useing the temporal table funciton to join two stream like this, but got
> this error. Any ways to solve this?
> {code:java}
> Exception in thread "main" java.lang.AssertionError: Cannot add expression of
> different type to set:
> set type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id,
> DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency,
> TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL proctime,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT
> conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME*
> proctime0) NOT NULL
> expression type is RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> order_id, DECIMAL(32, 2) price, VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
> currency, TIMESTAMP(3) order_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT NULL
> proctime, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" currency0, BIGINT
> conversion_rate, TIMESTAMP(3) update_time, TIMESTAMP_LTZ(3) *PROCTIME* NOT
> NULL proctime0) NOT NULL
> set is rel#61:LogicalCorrelate.NONE.any.None:
> 0.[NONE].[NONE](left=HepRelVertex#59,right=HepRelVertex#60,correlation=$cor0,joinType=inner,requiredColumns={4})
> expression is LogicalJoin(condition=[__TEMPORAL_JOIN_CONDITION($4, $7,
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY($5))], joinType=[inner])
> LogicalProject(order_id=[$0], price=[$1], currency=[$2], order_time=[$3],
> proctime=[PROCTIME()])
> LogicalTableScan(table=[[default_catalog, default_database, orders]])
> LogicalProject(currency=[$0], conversion_rate=[$1], update_time=[$2],
> proctime=[PROCTIME()])
> LogicalTableScan(table=[[default_catalog, default_database,
> currency_rates]])
> {code}
> Fact Table:
> {code:java}
> CREATE TABLE `orders` (
> order_id STRING,
> price DECIMAL(32,2),
> currency STRING,
> order_time TIMESTAMP(3),
> proctime as PROCTIME()
> ) WITH (
> 'properties.bootstrap.servers' = '127.0.0.1:9092',
> 'properties.group.id' = 'test',
> 'scan.topic-partition-discovery.interval' = '10000',
> 'connector' = 'kafka',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset',
> 'topic' = 'test1'
> ) {code}
> Build Table:
> {code:java}
> CREATE TABLE `currency_rates` (
> currency STRING,
> conversion_rate BIGINT,
> update_time TIMESTAMP(3),
> proctime as PROCTIME()
> ) WITH (
> 'properties.bootstrap.servers' = '127.0.0.1:9092',
> 'properties.group.id' = 'test',
> 'scan.topic-partition-discovery.interval' = '10000',
> 'connector' = 'kafka',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset',
> 'topic' = 'test3'
> ) {code}
> The way to generate table function:
> {code:java}
> TemporalTableFunction table_rate = tEnv.from("currency_rates")
> .createTemporalTableFunction("update_time", "currency");
> tEnv.registerFunction("rates", table_rate); {code}
> Join logic:
> {code:java}
> SELECT
> order_id,
> price,
> s.currency,
> conversion_rate,
> order_time
> FROM orders AS o,
> LATERAL TABLE (rates(o.proctime)) AS s
> WHERE o.currency = s.currency {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)