yong yang created FLINK-33616:
---------------------------------
Summary: multi lookup join error
Key: FLINK-33616
URL: https://issues.apache.org/jira/browse/FLINK-33616
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.17.1
Reporter: yong yang
stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 =
cast(jdbc2.stringfield2 as int)
show error: Temporal table join requires an equality condition on fields of
table [default_catalog.default_database.t22].
test code:
{code:java}
//代码占位符
package com.yy.flinkSqlJoin
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.types.Row
import java.time.ZoneId;
/**
+I 插入
-U 更新前
+U 更新后
-D 撤回消息 会往kafka发一条null 对应mysql删除一条消息.
* https://www.yuque.com/u430335/qea2i2/kw4qqu
* 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可.
* 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1)
* 测试使用:
* kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic user_order
* kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic
user_payment
* kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out
* kafka数据:
* 订单:
* {"order_id":100,"ts":1665367200000} -- step2
* {"order_id":101,"ts":1665367200000} -- step6
* 支付(mysql):
* use db_yy;
create table user_pay (
order_id bigint
,paymoney bigint
,primary key (order_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into user_pay values(100,111); -- step1
update user_pay set paymoney=222 where order_id=100; -- step3
insert into user_pay values(101,33); -- step4
update user_pay set paymoney=44 where order_id=101; -- step5
* 代码回撤流输出(只有insert):
* 8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后. 注意: lookup
join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果
* (true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后.
* kafka topic输出:
* {"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111}
* {"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44}
*
* 逻辑:
* lookup join 也分为 inner join; left join; full join.
* lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行).
*
*/
object LookUpJoinJDBCDemo {
def main(args: Array[String]): Unit = {
// Class.forName("com.mysql.cj.jdbc.Driver")
// flink1.13 流处理环境初始化
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// 指定国内时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
// 订单表
/*
kafka参数:
d_timestamp 从kafka元数据或者原始数据中获取
d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
参数:json.fail-on-missing-field
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8
当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
参数: json.ignore-parse-errors
当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
注意: 下面 with中的配置是kafka输入表的配置
*/
// val UserOrderTableSql =
// """
// |create table user_order (
// | order_id bigint,
// | ts bigint,
// | d_timestamp as TO_TIMESTAMP_LTZ(ts,3),
// | proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要
// |)WITH(
// | 'connector' = 'kafka',
// | 'topic' = 'user_order',
// | 'properties.bootstrap.servers' = 'localhost:9092',
// | 'properties.group.id' = 'g1',
// | 'scan.startup.mode' = 'latest-offset',
// | 'format' = 'json',
// | 'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错.
// | 'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据
// |)
// |""".stripMargin
// tEnv.executeSql(UserOrderTableSql)
// scala int 到 java Integer的隐式转换
/*
case class C1(age:Int,name:String,time:Long)
flink stream 事件时间
*/
val table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("order_id", DataTypes.STRING())
, DataTypes.FIELD("ts", DataTypes.INT())
, DataTypes.FIELD("d_timestamp", DataTypes.TIMESTAMP_LTZ(3))
),
row("100", Integer.valueOf(1), java.lang.Long.valueOf(1691722303347L))
, row("100", Integer.valueOf(2), java.lang.Long.valueOf(1691732303347L))
, row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691742303347L))
, row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691752303347L))
)
tEnv.createTemporaryView("user_order_pre1", table)
tEnv.executeSql(
"""
|create view user_order as select *,proctime() as proctime from
user_order_pre1
|""".stripMargin)
tEnv.from("user_order").execute().print()
// 支付表 时态表 维度表 必须有主键定义. kafka connector不支持主键. 维度表是有界表. join取最新版本.
所以这里两种kafka connector都有问题. 这里用mysql测试.
val paymentFlow =
"""
|create table user_pay (
| order_id string,
| paymoney bigint,
| PRIMARY KEY(order_id) NOT ENFORCED
|)WITH(
| 'connector' = 'jdbc',
| 'url' =
'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
| 'table-name' = 'user_pay',
| 'username' = 'root',
| 'password' = '123123123'
|)
|""".stripMargin
tEnv.executeSql(paymentFlow)
tEnv.executeSql(
"""
|create table t22
|(
| id string,
| age int,
| bi bigint
|)with(
| 'connector' = 'jdbc',
| 'url' =
'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
| 'table-name' = 't',
| 'username' = 'root',
| 'password' = '123123123'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|create view t33 as select *,cast(age as string) as age1 from t22
|""".stripMargin)
// 结果表
/*
注意: 下面 with中的配置是kafka输出表的配置
*/
val resTableSQL =
"""
|create table user_res (
| order_id bigint,
| d_timestamp TIMESTAMP_LTZ(3),
| paymoney bigint
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'out',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'format' = 'json',
| 'sink.partitioner' = 'default' -- 默认分区器
|)
|""".stripMargin
tEnv.executeSql(resTableSQL)
// 关联表并输出 注意: r是维度表关联处理时间后的表别名 inner join 事实表流来了去维度表匹配,匹配到才发往下游,匹配不到则丢掉
val tb1: Table = tEnv.sqlQuery(
"""
|select
| l.order_id,
| l.d_timestamp,
| r.paymoney,
| r2.age
|from user_order as l
|join
| user_pay FOR SYSTEM_TIME AS OF l.proctime AS r
|on l.order_id = r.order_id
|left join
| t22 FOR SYSTEM_TIME AS OF l.proctime AS r2
|-- on r.order_id = r2.id
|on r.order_id = cast(r2.age as string) -- error: Temporal table join
requires an equality condition on fields of table
[default_catalog.default_database.t22].
|""".stripMargin)
// 特别注意: 这里维表join on的条件(r1 r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: Temporal table
join requires an equality condition on fields of table
// 特别注意: 这里维表join on的条件(l r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: implicit type
conversion between VARCHAR(2147483647) and INTEGER is not supported on join's
condition now
tEnv.toDataStream(tb1).print()
// lookup join 之 left join; 事实表流 来数据去外部维度表匹配 无论是否匹配到 都会发往下游.
// val tb1: Table = tEnv.sqlQuery(
// """
// |select
// | l.order_id,
// | l.d_timestamp,
// | r.paymoney
// |from user_order as l left join user_pay FOR SYSTEM_TIME AS OF
l.proctime AS r
// |on l.order_id = r.order_id
// |""".stripMargin)
/*
报错: Unknown join type LEFT
lookup join 不支持right join. 因为事实表是驱动表,和right join的逻辑不符合.
*/
// val tb1: Table = tEnv.sqlQuery(
// """
// |select
// | l.order_id,
// | l.d_timestamp,
// | r.paymoney
// |from user_order as l right join user_pay FOR SYSTEM_TIME AS OF
l.proctime AS r
// |on l.order_id = r.order_id
// |""".stripMargin)
/*
报错: Unknown join type FULL
look up join 不支持 full outer join. 因为左侧是驱动表 所以只支持 inner join(flatmap +
_.filter.collect) 和 left join(flatmap + collect)
*/
// val tb1: Table = tEnv.sqlQuery(
// """
// |select
// | l.order_id,
// | l.d_timestamp,
// | r.paymoney
// |from user_order as l full join user_pay FOR SYSTEM_TIME AS OF
l.proctime AS r
// |on l.order_id = r.order_id
// |""".stripMargin)
// tEnv.toRetractStream(tb1, classOf[Row]).print()
// 注意 这里传入Table类型的变量名即可
// tEnv.executeSql("insert into user_res select * from " + tb1)
env.execute("job1")
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)