yong yang created FLINK-33616:
---------------------------------

             Summary: multi lookup join error
                 Key: FLINK-33616
                 URL: https://issues.apache.org/jira/browse/FLINK-33616
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.17.1
            Reporter: yong yang


stream1 lookup join jdbc1 on ... lookup join jdbc2 on jdbc1.intfield1 = 
cast(jdbc2.stringfield2 as int)

show error: Temporal table join requires an equality condition on fields of 
table [default_catalog.default_database.t22].

 

test code:

 
{code:java}
//代码占位符
package com.yy.flinkSqlJoin

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.types.Row

import java.time.ZoneId;

/**
+I  插入
-U  更新前
+U  更新后
-D  撤回消息  会往kafka发一条null 对应mysql删除一条消息.
 * https://www.yuque.com/u430335/qea2i2/kw4qqu
 * 因为inner/left join不会发出回撤流 都是append 所以sink只需要支持append语义即可.
 * 要求事实表维度表关联键key1 必须在维度表的DDL中指定为主键 primary key (key1)
 * 测试使用:
 *    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic user_order
 *    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic 
user_payment
 *    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic out
 * kafka数据:
 *    订单:
 *      {"order_id":100,"ts":1665367200000}  -- step2
 *      {"order_id":101,"ts":1665367200000}  -- step6
 *    支付(mysql):
 *      use db_yy;
        create table user_pay (
                        order_id bigint
                      ,paymoney bigint
                      ,primary key (order_id)
                      )ENGINE=InnoDB DEFAULT CHARSET=utf8;

        insert into user_pay values(100,111); -- step1
        update user_pay set paymoney=222 where order_id=100; -- step3
        insert into user_pay values(101,33);  -- step4
        update user_pay set paymoney=44 where order_id=101; -- step5

 *    代码回撤流输出(只有insert):
 *        8> (true,+I[100, 2022-10-10T02:00:00Z, 111]) -- step2 之后.  注意: lookup 
join是事实表为准,匹配维度表最新的数据. 没有也输出,维度表如果更新了,不会发回撤流更新结果
 *        (true,+I[101, 2022-10-10T02:00:00Z, 44]) -- step6 之后.
 *    kafka topic输出:
 *        {"order_id":100,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":111}
 *        {"order_id":101,"d_timestamp":"2022-10-10 02:00:00Z","paymoney":44}
 *
 *    逻辑:
 *        lookup join 也分为 inner join; left join; full join.
 *        lookup join是取事实表匹配维度表时的最新的数据. 要求维度表的join字段是外部connector的主键(kafka不行).
 *
 */
object LookUpJoinJDBCDemo {


  def main(args: Array[String]): Unit = {
//    Class.forName("com.mysql.cj.jdbc.Driver")
    // flink1.13 流处理环境初始化
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)

    // 指定国内时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    // 订单表
    /*
    kafka参数:
     d_timestamp 从kafka元数据或者原始数据中获取
        d_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
     参数:json.fail-on-missing-field  
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/json/#%e5%a6%82%e4%bd%95%e5%88%9b%e5%bb%ba%e4%b8%80%e5%bc%a0%e5%9f%ba%e4%ba%8e-json-format-%e7%9a%84%e8%a1%a8
        当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
     参数: json.ignore-parse-errors
        当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
     注意: 下面 with中的配置是kafka输入表的配置
     */
//    val UserOrderTableSql =
//      """
//        |create table user_order (
//        | order_id bigint,
//        | ts bigint,
//        | d_timestamp as TO_TIMESTAMP_LTZ(ts,3),
//        | proctime AS PROCTIME() -- 事实表需要处理时间,维度表不需要
//        |)WITH(
//        |    'connector' = 'kafka',
//        |      'topic' = 'user_order',
//        |      'properties.bootstrap.servers' = 'localhost:9092',
//        |      'properties.group.id' = 'g1',
//        |      'scan.startup.mode' = 'latest-offset',
//        |      'format' = 'json',
//        |      'json.fail-on-missing-field' = 'false', -- 解析字段缺失 是跳过还是报错.
//        |      'json.ignore-parse-errors' = 'true' -- 跳过解析异常的数据
//        |)
//        |""".stripMargin
//    tEnv.executeSql(UserOrderTableSql)

    // scala int 到 java Integer的隐式转换

    /*
     case class C1(age:Int,name:String,time:Long)
     flink stream 事件时间
     */


    val table = tEnv.fromValues(
      DataTypes.ROW(
        DataTypes.FIELD("order_id", DataTypes.STRING())
        , DataTypes.FIELD("ts", DataTypes.INT())
        , DataTypes.FIELD("d_timestamp", DataTypes.TIMESTAMP_LTZ(3))
      ),
      row("100", Integer.valueOf(1), java.lang.Long.valueOf(1691722303347L))
      , row("100", Integer.valueOf(2), java.lang.Long.valueOf(1691732303347L))
      , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691742303347L))
      , row("100", Integer.valueOf(3), java.lang.Long.valueOf(1691752303347L))
    )
    tEnv.createTemporaryView("user_order_pre1", table)
    tEnv.executeSql(
      """
        |create view user_order as select *,proctime() as proctime from 
user_order_pre1
        |""".stripMargin)

    tEnv.from("user_order").execute().print()



    // 支付表 时态表 维度表 必须有主键定义. kafka connector不支持主键. 维度表是有界表. join取最新版本. 
所以这里两种kafka connector都有问题. 这里用mysql测试.
    val paymentFlow =
      """
        |create table user_pay (
        | order_id string,
        | paymoney bigint,
        | PRIMARY KEY(order_id) NOT ENFORCED
        |)WITH(
        |    'connector' = 'jdbc',
        |    'url' = 
'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
        |    'table-name' = 'user_pay',
        |    'username' = 'root',
        |    'password' = '123123123'
        |)
        |""".stripMargin
    tEnv.executeSql(paymentFlow)

    tEnv.executeSql(
      """
        |create table t22
        |(
        |    id  string,
        |    age int,
        |    bi bigint
        |)with(
        |   'connector' = 'jdbc',
        |    'url' = 
'jdbc:mysql://localhost:3306/db_yy?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',
        |    'table-name' = 't',
        |    'username' = 'root',
        |    'password' = '123123123'
        |)
        |""".stripMargin)

    tEnv.executeSql(
      """
        |create view t33 as select *,cast(age as string) as age1 from t22
        |""".stripMargin)

    // 结果表
    /*
    注意: 下面 with中的配置是kafka输出表的配置
     */
    val resTableSQL =
      """
        |create table user_res (
        | order_id bigint,
        | d_timestamp TIMESTAMP_LTZ(3),
        | paymoney bigint
        |)WITH(
        |    'connector' = 'kafka',
        |      'topic' = 'out',
        |      'properties.bootstrap.servers' = 'localhost:9092',
        |      'format' = 'json',
        |      'sink.partitioner' = 'default' -- 默认分区器
        |)
        |""".stripMargin
    tEnv.executeSql(resTableSQL)

    // 关联表并输出 注意: r是维度表关联处理时间后的表别名 inner join 事实表流来了去维度表匹配,匹配到才发往下游,匹配不到则丢掉
    val tb1: Table = tEnv.sqlQuery(
      """
        |select
        |   l.order_id,
        |   l.d_timestamp,
        |   r.paymoney,
        |   r2.age
        |from user_order as l
        |join
        |   user_pay FOR SYSTEM_TIME AS OF l.proctime AS r
        |on l.order_id = r.order_id
        |left join
        |   t22 FOR SYSTEM_TIME AS OF l.proctime AS r2
        |-- on r.order_id = r2.id
        |on r.order_id = cast(r2.age as string) -- error: Temporal table join 
requires an equality condition on fields of table 
[default_catalog.default_database.t22].
        |""".stripMargin)
    // 特别注意: 这里维表join on的条件(r1 r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: Temporal table 
join requires an equality condition on fields of table
    // 特别注意: 这里维表join on的条件(l r2)不能有cast来类型转换 不能on的两侧类型不同,否则报错: implicit type 
conversion between VARCHAR(2147483647) and INTEGER is not supported on join's 
condition now

    tEnv.toDataStream(tb1).print()

    // lookup join 之 left join;  事实表流 来数据去外部维度表匹配 无论是否匹配到 都会发往下游.
//    val tb1: Table = tEnv.sqlQuery(
//      """
//        |select
//        |   l.order_id,
//        |   l.d_timestamp,
//        |   r.paymoney
//        |from user_order as l left join user_pay FOR SYSTEM_TIME AS OF 
l.proctime AS r
//        |on l.order_id = r.order_id
//        |""".stripMargin)


    /*
    报错: Unknown join type LEFT
    lookup join 不支持right join. 因为事实表是驱动表,和right join的逻辑不符合.
     */
//    val tb1: Table = tEnv.sqlQuery(
//      """
//        |select
//        |   l.order_id,
//        |   l.d_timestamp,
//        |   r.paymoney
//        |from user_order as l right join user_pay FOR SYSTEM_TIME AS OF 
l.proctime AS r
//        |on l.order_id = r.order_id
//        |""".stripMargin)

        /*
         报错: Unknown join type FULL
        look up join 不支持 full outer join. 因为左侧是驱动表 所以只支持 inner join(flatmap + 
_.filter.collect) 和 left join(flatmap + collect)
         */
//        val tb1: Table = tEnv.sqlQuery(
//          """
//            |select
//            |   l.order_id,
//            |   l.d_timestamp,
//            |   r.paymoney
//            |from user_order as l full join user_pay FOR SYSTEM_TIME AS OF 
l.proctime AS r
//            |on l.order_id = r.order_id
//            |""".stripMargin)

//    tEnv.toRetractStream(tb1, classOf[Row]).print()

    // 注意 这里传入Table类型的变量名即可
//    tEnv.executeSql("insert into user_res select * from " + tb1)


    env.execute("job1")
  }

}
 {code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to