wangbaohua created FLINK-24952: ---------------------------------- Summary: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before Key: FLINK-24952 URL: https://issues.apache.org/jira/browse/FLINK-24952 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.13.1 Environment: public void test() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(5000); //检查点 每5000ms env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties browseProperties = new Properties(); browseProperties.put("bootstrap.servers", "192.168.1.25:9093"); browseProperties.put("group.id", "temporal"); browseProperties.put("auto.offset.reset", "latest"); PropTransformMap.getInstance().readConfigMap("./conf/cfg.properties"); Map<String, String> configMap = new HashMap<String, String>(); configMap.put(Constants.DB_JDBC_USER, "root"); configMap.put(Constants.DB_JDBC_PASSWD, "1qazXSW@3edc"); configMap.put(Constants.DB_JDBC_URL, "jdbc:mysql://192.168.1.25:3306/SSA?useUnicode=true&characterEncoding=utf-8"); configMap.put(Constants.DB_JDBC_DRIVER, "com.mysql.jdbc.Driver"); configMap.put(Constants.INITAL_POOL_SIZE, "10"); configMap.put(Constants.MIN_POOL_SIZE, "5"); configMap.put(Constants.MAX_IDLE_TIME, "50"); configMap.put(Constants.MAX_STATE_ELEMENTS, "100"); configMap.put(Constants.MAX_IDLE_TIME, "60"); DbFetcher dbFetcher = new DbFetcher(configMap); List<String> listRule = RuleReader.readRules(dbFetcher); System.out.println("ListRule::" + listRule.size()); final String RULE_SBROAD_CAST_STATE = "RulesBroadcastState"; RuleParse ruleParse = new RuleParse(); Map properties = new HashMap(); ruleParse.parseData("./conf/cfg.json"); //1、读取mysql的配置消息 DataStream<List<String>> conf = env.addSource(new MysqlSourceFunction1(dbFetcher)); //2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则 MapStateDescriptor<String, List<String>> ruleStateDescriptor = new MapStateDescriptor<>(RULE_SBROAD_CAST_STATE , BasicTypeInfo.STRING_TYPE_INFO , new ListTypeInfo<>(String.class)); //3、对conf进行broadcast返回BroadcastStream final BroadcastStream<List<String>> confBroadcast = conf.broadcast(ruleStateDescriptor); //DataStream<String> dataStream = env.fromElements("{\"ORG_ID\":\"1\",\"RAW_MSG\":\"useradd,su - root\",\"EVENT_THREE_TYPE\":\"40001\",\"EVENT_TWO_TYPE\":\"40001\",\"SRC_PORT\":\"123\",\"DST_PORT\":\"124\",\"DST_IP\":\"10.16.254.11\",\"SRC_IP\":\"50.115.134.50\",\"CREATE_TIME\":\"2021-07-09 18:15:21.001\",\"DEVICE_PARENT_TYPE\":\"LINUX\",\"SNOW_ID\":\"85512\",\"EVENT_THREE_TYPE_DESC\":\"暴力破解失败\",\"ts\":\"2021-05-27 16:06:58\",\"ACCOUNT\":\"asap\",\"collectionName\":\"bwdOMS\",\"eRuleId\":\"0\",\"RULE_TJ_COUNT\":11,\"TAGS\":{\"EVENT_ONE_TYPE\":\"20000\",\"DIRECTION\":\"内部\",\"EVENT_TWO_TYPE\":\"10015\",\"EVENT_THREE_TYPE\":\"20101\"},\"DEVICE_TYPE\":\"OSM\",\"DIRECTION\":\"0\"}\n"); DataStream<String> dataStream = env.fromElements("{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-26 17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}", "{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25 17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}", "{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25 17:04:38.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}"); DataStream<StandardEvent> kafkaData = dataStream .map(new MapFunction<String, StandardEvent>() { @Override public StandardEvent map(String value) throws Exception { StandardEvent standardEvent = StandardEvent.parse(value); System.out.println("standardEvent:"+standardEvent.getField("RAW_MSG")); return standardEvent; } }); DataStream<BeanField> kafkaData1 = kafkaData.map( new dealMapFunction(ruleParse)); EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings); Table inputTable = blinkStreamTableEnv.fromDataStream(kafkaData1, Schema.newBuilder() .column("snowId","STRING") .column("deviceType","STRING") .column("createTime","TIMESTAMP_LTZ(3)") .watermark("createTime", "SOURCE_WATERMARK()") .build()); blinkStreamTableEnv.createTemporaryView("InputTable", inputTable); // Table resultTable = blinkStreamTableEnv.sqlQuery("SELECT * FROM InputTable"); // blinkStreamTableEnv.toRetractStream(resultTable, BeanField.class).print("query=="); String querySQL0 = "select deviceType,count(1) from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType"; String querySQL1 = "select * from (select deviceType,count(1) from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) b"; String querySQL2 = "select deviceType,count(1) from InputTable where deviceType='OSM' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType "; String querySQL3 = "select window_start, window_end,deviceType,snowId from TABLE(\n" + " TUMBLE(TABLE InputTable, DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" + " where deviceType='TDA' GROUP BY window_start, window_end,deviceType,snowId"; String querySQL4 = "select b.* from (select deviceType,snowId from TABLE(\n" + "TUMBLE(TABLE InputTable, DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" + " where deviceType='TDA' GROUP BY window_start, window_end,deviceType,snowId) a,(SELECT * FROM InputTable) b" + " where a.snowId =b.snowId"; System.out.println(querySQL4); Table table2 = blinkStreamTableEnv.sqlQuery(querySQL4); blinkStreamTableEnv.toRetractStream(table2, BeanField.class).print("query2=="); // // String sql = "select * from (select deviceType,count(1) from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where deviceType='IDS' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) b,(select deviceType,count(1) from InputTable where deviceType='IDS' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) c"; // Table resultTableIps= blinkStreamTableEnv.sqlQuery(sql); // blinkStreamTableEnv.toRetractStream(resultTableIps, Row.class).print("queryIps=="); env.execute("Broadcast test kafka"); } Reporter: wangbaohua Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:82) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:517) at rete.ReteDemo4.test(ReteDemo4.java:478) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. at org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:79) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) ... 49 more Process finished with exit code -1 -- This message was sent by Atlassian Jira (v8.20.1#820001)