[
https://issues.apache.org/jira/browse/FLINK-33616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yong yang closed FLINK-33616.
-----------------------------
Resolution: Done
> multi lookup join error
> -----------------------
>
> Key: FLINK-33616
> URL: https://issues.apache.org/jira/browse/FLINK-33616
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.17.1
> Reporter: yong yang
> Priority: Major
>
> stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 =
> cast(jdbc2.stringfield2 as int)
> show error: Temporal table join requires an equality condition on fields of
> table [default_catalog.default_database.t22].
>
> test code:
>
> {code:java}
> //代码占位符
> package com.yy.flinkSqlJoin
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.table.api.Expressions.row
> import org.apache.flink.table.api.{DataTypes, Table}
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
> import org.apache.flink.types.Row
> import java.time.ZoneId;
> /**
> +I 插入
> -U 更新前
> +U 更新后
> -D 撤回消息 会往kafka发一条null 对应mysql删除一条消息.
> * https://www.yuque.com/u430335/qea2i2/kw4qqu
> * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可.
> * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1)
> * 测试使用:
> * kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic
> user_order
> * kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic
> user_payment
> * kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out
> * kafka数据:
> * 订单:
> * {"order_id":100,"ts":1665367200000} -- step2
> * {"order_id":101,"ts":1665367200000} -- step6
> * 支付(mysql):
> * use db_yy;
> create table user_pay (
> order_id bigint
> ,paymoney bigint
> ,primary key (order_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8;
> insert into user_pay values(100,111); -- step1
> update user_pay set paymoney=222 where order_id=100; -- step3
> insert into user_pay values(101,33); -- step4
> update user_pay set paymoney=44 where order_id=101; -- step5
> * 代码回撤流输出(只有insert):
> * 8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后. 注意:
> lookup join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果
> * (true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后.
> * kafka topic输出:
> * {"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111}
> * {"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44}
> *
> * 逻辑:
> * lookup join 也分为 inner join; left join; full join.
> * lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行).
> *
> */
> object LookUpJoinJDBCDemo {
> def main(args: Array[String]): Unit = {
> // Class.forName("com.mysql.cj.jdbc.Driver")
> // flink1.13 流处理环境初始化
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> // 指定国内时区
> tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
> // 订单表
> /*
> kafka参数:
> d_timestamp 从kafka元数据或者原始数据中获取
> d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
> 参数:json.fail-on-missing-field
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8
> 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
> 参数: json.ignore-parse-errors
> 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为
> false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
> 注意: 下面 with中的配置是kafka输入表的配置
> */
> // val UserOrderTableSql =
> // """
> // |create table user_order (
> // | order_id bigint,
> // | ts bigint,
> // | d_timestamp as TO_TIMESTAMP_LTZ(ts,3),
> // | proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要
> // |)WITH(
> // | 'connector' = 'kafka',
> // | 'topic' = 'user_order',
> // | 'properties.bootstrap.servers' = 'localhost:9092',
> // | 'properties.group.id' = 'g1',
> // | 'scan.startup.mode' = 'latest-offset',
> // | 'format' = 'json',
> // | 'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错.
> // | 'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据
> // |)
> // |""".stripMargin
> // tEnv.executeSql(UserOrderTableSql)
> // scala int 到 java Integer的隐式转换
> /*
> case class C1(age:Int,name:String,time:Long)
> flink stream 事件时间
> */
> val table = tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("order_id", DataTypes.STRING())
> , DataTypes.FIELD("ts", DataTypes.INT())
> , DataTypes.FIELD("d_timestamp", DataTypes.TIMESTAMP_LTZ(3))
> ),
> row("100", Integer.valueOf(1), java.lang.Long.valueOf(1691722303347L))
> , row("100", Integer.valueOf(2), java.lang.Long.valueOf(1691732303347L))
> , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691742303347L))
> , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691752303347L))
> )
> tEnv.createTemporaryView("user_order_pre1", table)
> tEnv.executeSql(
> """
> |create view user_order as select *,proctime() as proctime from
> user_order_pre1
> |""".stripMargin)
> tEnv.from("user_order").execute().print()
> // 支付表 时态表 维度表 必须有主键定义. kafka connector不支持主键. 维度表是有界表. join取最新版本.
> 所以这里两种kafka connector都有问题. 这里用mysql测试.
> val paymentFlow =
> """
> |create table user_pay (
> | order_id string,
> | paymoney bigint,
> | PRIMARY KEY(order_id) NOT ENFORCED
> |)WITH(
> | 'connector' = 'jdbc',
> | 'url' =
> 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
> | 'table-name' = 'user_pay',
> | 'username' = 'root',
> | 'password' = '123123123'
> |)
> |""".stripMargin
> tEnv.executeSql(paymentFlow)
> tEnv.executeSql(
> """
> |create table t22
> |(
> | id string,
> | age int,
> | bi bigint
> |)with(
> | 'connector' = 'jdbc',
> | 'url' =
> 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
> | 'table-name' = 't',
> | 'username' = 'root',
> | 'password' = '123123123'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> """
> |create view t33 as select *,cast(age as string) as age1 from t22
> |""".stripMargin)
> // 结果表
> /*
> 注意: 下面 with中的配置是kafka输出表的配置
> */
> val resTableSQL =
> """
> |create table user_res (
> | order_id bigint,
> | d_timestamp TIMESTAMP_LTZ(3),
> | paymoney bigint
> |)WITH(
> | 'connector' = 'kafka',
> | 'topic' = 'out',
> | 'properties.bootstrap.servers' = 'localhost:9092',
> | 'format' = 'json',
> | 'sink.partitioner' = 'default' -- 默认分区器
> |)
> |""".stripMargin
> tEnv.executeSql(resTableSQL)
> // 关联表并输出 注意: r是维度表关联处理时间后的表别名 inner join 事实表流来了去维度表匹配,匹配到才发往下游,匹配不到则丢掉
> val tb1: Table = tEnv.sqlQuery(
> """
> |select
> | l.order_id,
> | l.d_timestamp,
> | r.paymoney,
> | r2.age
> |from user_order as l
> |join
> | user_pay FOR SYSTEM_TIME AS OF l.proctime AS r
> |on l.order_id = r.order_id
> |left join
> | t22 FOR SYSTEM_TIME AS OF l.proctime AS r2
> |-- on r.order_id = r2.id
> |on r.order_id = cast(r2.age as string) -- error: Temporal table join
> requires an equality condition on fields of table
> [default_catalog.default_database.t22].
> |""".stripMargin)
> // 特别注意: 这里维表join on的条件(r1 r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: Temporal
> table join requires an equality condition on fields of table
> // 特别注意: 这里维表join on的条件(l r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: implicit type
> conversion between VARCHAR(2147483647) and INTEGER is not supported on join's
> condition now
> tEnv.toDataStream(tb1).print()
> // lookup join 之 left join; 事实表流 来数据去外部维度表匹配 无论是否匹配到 都会发往下游.
> // val tb1: Table = tEnv.sqlQuery(
> // """
> // |select
> // | l.order_id,
> // | l.d_timestamp,
> // | r.paymoney
> // |from user_order as l left join user_pay FOR SYSTEM_TIME AS OF
> l.proctime AS r
> // |on l.order_id = r.order_id
> // |""".stripMargin)
> /*
> 报错: Unknown join type LEFT
> lookup join 不支持right join. 因为事实表是驱动表,和right join的逻辑不符合.
> */
> // val tb1: Table = tEnv.sqlQuery(
> // """
> // |select
> // | l.order_id,
> // | l.d_timestamp,
> // | r.paymoney
> // |from user_order as l right join user_pay FOR SYSTEM_TIME AS OF
> l.proctime AS r
> // |on l.order_id = r.order_id
> // |""".stripMargin)
> /*
> 报错: Unknown join type FULL
> look up join 不支持 full outer join. 因为左侧是驱动表 所以只支持 inner join(flatmap +
> _.filter.collect) 和 left join(flatmap + collect)
> */
> // val tb1: Table = tEnv.sqlQuery(
> // """
> // |select
> // | l.order_id,
> // | l.d_timestamp,
> // | r.paymoney
> // |from user_order as l full join user_pay FOR SYSTEM_TIME AS OF
> l.proctime AS r
> // |on l.order_id = r.order_id
> // |""".stripMargin)
> // tEnv.toRetractStream(tb1, classOf[Row]).print()
> // 注意 这里传入Table类型的变量名即可
> // tEnv.executeSql("insert into user_res select * from " + tb1)
> env.execute("job1")
> }
> }
> {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)