yong yang created FLINK-33616: --------------------------------- Summary: multi lookup join error Key: FLINK-33616 URL: https://issues.apache.org/jira/browse/FLINK-33616 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.17.1 Reporter: yong yang
stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 = cast(jdbc2.stringfield2 as int) show error: Temporal table join requires an equality condition on fields of table [default_catalog.default_database.t22]. test code: {code:java} //代码占位符 package com.yy.flinkSqlJoin import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.row import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.types.Row import java.time.ZoneId; /** +I 插入 -U 更新前 +U 更新后 -D 撤回消息 会往kafka发一条null 对应mysql删除一条消息. * https://www.yuque.com/u430335/qea2i2/kw4qqu * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可. * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1) * 测试使用: * kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic user_order * kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic user_payment * kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out * kafka数据: * 订单: * {"order_id":100,"ts":1665367200000} -- step2 * {"order_id":101,"ts":1665367200000} -- step6 * 支付(mysql): * use db_yy; create table user_pay ( order_id bigint ,paymoney bigint ,primary key (order_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into user_pay values(100,111); -- step1 update user_pay set paymoney=222 where order_id=100; -- step3 insert into user_pay values(101,33); -- step4 update user_pay set paymoney=44 where order_id=101; -- step5 * 代码回撤流输出(只有insert): * 8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后. 注意: lookup join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果 * (true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后. * kafka topic输出: * {"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111} * {"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44} * * 逻辑: * lookup join 也分为 inner join; left join; full join. * lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行). * */ object LookUpJoinJDBCDemo { def main(args: Array[String]): Unit = { // Class.forName("com.mysql.cj.jdbc.Driver") // flink1.13 流处理环境初始化 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) // 指定国内时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) // 订单表 /* kafka参数: d_timestamp 从kafka元数据或者原始数据中获取 d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' 参数:json.fail-on-missing-field https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败) 参数: json.ignore-parse-errors 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 注意: 下面 with中的配置是kafka输入表的配置 */ // val UserOrderTableSql = // """ // |create table user_order ( // | order_id bigint, // | ts bigint, // | d_timestamp as TO_TIMESTAMP_LTZ(ts,3), // | proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要 // |)WITH( // | 'connector' = 'kafka', // | 'topic' = 'user_order', // | 'properties.bootstrap.servers' = 'localhost:9092', // | 'properties.group.id' = 'g1', // | 'scan.startup.mode' = 'latest-offset', // | 'format' = 'json', // | 'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错. // | 'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据 // |) // |""".stripMargin // tEnv.executeSql(UserOrderTableSql) // scala int 到 java Integer的隐式转换 /* case class C1(age:Int,name:String,time:Long) flink stream 事件时间 */ val table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("order_id", DataTypes.STRING()) , DataTypes.FIELD("ts", DataTypes.INT()) , DataTypes.FIELD("d_timestamp", DataTypes.TIMESTAMP_LTZ(3)) ), row("100", Integer.valueOf(1), java.lang.Long.valueOf(1691722303347L)) , row("100", Integer.valueOf(2), java.lang.Long.valueOf(1691732303347L)) , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691742303347L)) , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691752303347L)) ) tEnv.createTemporaryView("user_order_pre1", table) tEnv.executeSql( """ |create view user_order as select *,proctime() as proctime from user_order_pre1 |""".stripMargin) tEnv.from("user_order").execute().print() // 支付表 时态表 维度表 必须有主键定义. kafka connector不支持主键. 维度表是有界表. join取最新版本. 所以这里两种kafka connector都有问题. 这里用mysql测试. val paymentFlow = """ |create table user_pay ( | order_id string, | paymoney bigint, | PRIMARY KEY(order_id) NOT ENFORCED |)WITH( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai', | 'table-name' = 'user_pay', | 'username' = 'root', | 'password' = '123123123' |) |""".stripMargin tEnv.executeSql(paymentFlow) tEnv.executeSql( """ |create table t22 |( | id string, | age int, | bi bigint |)with( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai', | 'table-name' = 't', | 'username' = 'root', | 'password' = '123123123' |) |""".stripMargin) tEnv.executeSql( """ |create view t33 as select *,cast(age as string) as age1 from t22 |""".stripMargin) // 结果表 /* 注意: 下面 with中的配置是kafka输出表的配置 */ val resTableSQL = """ |create table user_res ( | order_id bigint, | d_timestamp TIMESTAMP_LTZ(3), | paymoney bigint |)WITH( | 'connector' = 'kafka', | 'topic' = 'out', | 'properties.bootstrap.servers' = 'localhost:9092', | 'format' = 'json', | 'sink.partitioner' = 'default' -- 默认分区器 |) |""".stripMargin tEnv.executeSql(resTableSQL) // 关联表并输出 注意: r是维度表关联处理时间后的表别名 inner join 事实表流来了去维度表匹配,匹配到才发往下游,匹配不到则丢掉 val tb1: Table = tEnv.sqlQuery( """ |select | l.order_id, | l.d_timestamp, | r.paymoney, | r2.age |from user_order as l |join | user_pay FOR SYSTEM_TIME AS OF l.proctime AS r |on l.order_id = r.order_id |left join | t22 FOR SYSTEM_TIME AS OF l.proctime AS r2 |-- on r.order_id = r2.id |on r.order_id = cast(r2.age as string) -- error: Temporal table join requires an equality condition on fields of table [default_catalog.default_database.t22]. |""".stripMargin) // 特别注意: 这里维表join on的条件(r1 r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: Temporal table join requires an equality condition on fields of table // 特别注意: 这里维表join on的条件(l r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: implicit type conversion between VARCHAR(2147483647) and INTEGER is not supported on join's condition now tEnv.toDataStream(tb1).print() // lookup join 之 left join; 事实表流 来数据去外部维度表匹配 无论是否匹配到 都会发往下游. // val tb1: Table = tEnv.sqlQuery( // """ // |select // | l.order_id, // | l.d_timestamp, // | r.paymoney // |from user_order as l left join user_pay FOR SYSTEM_TIME AS OF l.proctime AS r // |on l.order_id = r.order_id // |""".stripMargin) /* 报错: Unknown join type LEFT lookup join 不支持right join. 因为事实表是驱动表,和right join的逻辑不符合. */ // val tb1: Table = tEnv.sqlQuery( // """ // |select // | l.order_id, // | l.d_timestamp, // | r.paymoney // |from user_order as l right join user_pay FOR SYSTEM_TIME AS OF l.proctime AS r // |on l.order_id = r.order_id // |""".stripMargin) /* 报错: Unknown join type FULL look up join 不支持 full outer join. 因为左侧是驱动表 所以只支持 inner join(flatmap + _.filter.collect) 和 left join(flatmap + collect) */ // val tb1: Table = tEnv.sqlQuery( // """ // |select // | l.order_id, // | l.d_timestamp, // | r.paymoney // |from user_order as l full join user_pay FOR SYSTEM_TIME AS OF l.proctime AS r // |on l.order_id = r.order_id // |""".stripMargin) // tEnv.toRetractStream(tb1, classOf[Row]).print() // 注意 这里传入Table类型的变量名即可 // tEnv.executeSql("insert into user_res select * from " + tb1) env.execute("job1") } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)