yutao created FLINK-28111: ----------------------------- Summary: flinksql use hivecatalog cause union all operation lost 'eventTime attribute' Key: FLINK-28111 URL: https://issues.apache.org/jira/browse/FLINK-28111 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.13.5, 1.12.7, 1.13.3, 1.12.5, 1.12.4 Environment: flink 1.12.4
hadoop 2.6.5 hive 1.1.0 Reporter: yutao In my scenario , i have 2 topics have same schema ; i register them to table and define eventtime. then create view use union all 2 table ,and use view group by tumble window ; but when set hivecatalog ,sql can not run ;just like this: Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. *The complete code is as follows* {code:java} package com.unicom.test; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; /** * * @author yt */ public class DataGenAndPrintSink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); String defaultDatabase = "dc_dw" ; String catalogName = "dc_catalog"; HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, "hdfs://beh/flink/hive/conf","1.1.0"); tableEnv.registerCatalog(catalogName, hive); tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" + " -- 维度数据\n" + " order_id STRING,\n" + " -- 用户 id\n" + " user_id BIGINT,\n" + " -- 用户\n" + " price BIGINT,\n" + " -- 事件时间戳\n" + " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" + " -- watermark 设置\n" + " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '10',\n" + " 'fields.order_id.length' = '1',\n" + " 'fields.user_id.min' = '1',\n" + " 'fields.user_id.max' = '100000',\n" + " 'fields.price.min' = '1',\n" + " 'fields.price.max' = '100000'\n" + ")"; String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" + " -- 维度数据\n" + " order_id STRING,\n" + " -- 用户 id\n" + " user_id BIGINT,\n" + " -- 用户\n" + " price BIGINT,\n" + " -- 事件时间戳\n" + " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" + " -- watermark 设置\n" + " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '10',\n" + " 'fields.order_id.length' = '1',\n" + " 'fields.user_id.min' = '1',\n" + " 'fields.user_id.max' = '100000',\n" + " 'fields.price.min' = '1',\n" + " 'fields.price.max' = '100000'\n" + ")"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sourceDDL_2); String view = "create view IF NOT EXISTS test_view as select * from (select * from source_table union all select * from source_table_2) tb1"; tableEnv.executeSql(view); String sqlGroup = "select count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start from test_view group by order_id,tumble(row_time, interval '1' minute)"; tableEnv.executeSql(sqlGroup).print(); } } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)