[
https://issues.apache.org/jira/browse/FLINK-24952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445787#comment-17445787
]
wangbaohua commented on FLINK-24952:
------------------------------------
public void test() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000); //检查点 每5000ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers", "192.168.1.25:9093");
browseProperties.put("group.id", "temporal");
browseProperties.put("auto.offset.reset", "latest");
PropTransformMap.getInstance().readConfigMap("./conf/cfg.properties");
Map<String, String> configMap = new HashMap<String, String>();
configMap.put(Constants.DB_JDBC_USER, "root");
configMap.put(Constants.DB_JDBC_PASSWD, "1qazXSW@3edc");
configMap.put(Constants.DB_JDBC_URL,
"jdbc:mysql://192.168.1.25:3306/SSA?useUnicode=true&characterEncoding=utf-8");
configMap.put(Constants.DB_JDBC_DRIVER,
"com.mysql.jdbc.Driver");
configMap.put(Constants.INITAL_POOL_SIZE, "10");
configMap.put(Constants.MIN_POOL_SIZE, "5");
configMap.put(Constants.MAX_IDLE_TIME, "50");
configMap.put(Constants.MAX_STATE_ELEMENTS, "100");
configMap.put(Constants.MAX_IDLE_TIME, "60");
DbFetcher dbFetcher = new DbFetcher(configMap);
List<String> listRule = RuleReader.readRules(dbFetcher);
System.out.println("ListRule::" + listRule.size());
final String RULE_SBROAD_CAST_STATE = "RulesBroadcastState";
RuleParse ruleParse = new RuleParse();
Map properties = new HashMap();
ruleParse.parseData("./conf/cfg.json");
//1、读取mysql的配置消息
DataStream<List<String>> conf = env.addSource(new
MysqlSourceFunction1(dbFetcher));
//2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则
MapStateDescriptor<String, List<String>> ruleStateDescriptor =
new MapStateDescriptor<>(RULE_SBROAD_CAST_STATE
, BasicTypeInfo.STRING_TYPE_INFO
, new ListTypeInfo<>(String.class));
//3、对conf进行broadcast返回BroadcastStream
final BroadcastStream<List<String>> confBroadcast =
conf.broadcast(ruleStateDescriptor);
//DataStream<String> dataStream =
env.fromElements("{\"ORG_ID\":\"1\",\"RAW_MSG\":\"useradd,su -
root\",\"EVENT_THREE_TYPE\":\"40001\",\"EVENT_TWO_TYPE\":\"40001\",\"SRC_PORT\":\"123\",\"DST_PORT\":\"124\",\"DST_IP\":\"10.16.254.11\",\"SRC_IP\":\"50.115.134.50\",\"CREATE_TIME\":\"2021-07-09
18:15:21.001\",\"DEVICE_PARENT_TYPE\":\"LINUX\",\"SNOW_ID\":\"85512\",\"EVENT_THREE_TYPE_DESC\":\"暴力破解失败\",\"ts\":\"2021-05-27
16:06:58\",\"ACCOUNT\":\"asap\",\"collectionName\":\"bwdOMS\",\"eRuleId\":\"0\",\"RULE_TJ_COUNT\":11,\"TAGS\":{\"EVENT_ONE_TYPE\":\"20000\",\"DIRECTION\":\"内部\",\"EVENT_TWO_TYPE\":\"10015\",\"EVENT_THREE_TYPE\":\"20101\"},\"DEVICE_TYPE\":\"OSM\",\"DIRECTION\":\"0\"}\n");
DataStream<String> dataStream =
env.fromElements("{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-26
17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}",
"{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25
17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}",
"{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25
17:04:38.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}");
DataStream<StandardEvent> kafkaData = dataStream
.map(new MapFunction<String, StandardEvent>() {
@Override
public StandardEvent map(String value)
throws Exception {
StandardEvent standardEvent =
StandardEvent.parse(value);
System.out.println("standardEvent:"+standardEvent.getField("RAW_MSG"));
return standardEvent;
}
});
DataStream<BeanField> kafkaData1 = kafkaData.map( new
dealMapFunction(ruleParse));
EnvironmentSettings blinkStreamSettings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv =
StreamTableEnvironment.create(env, blinkStreamSettings);
Table inputTable =
blinkStreamTableEnv.fromDataStream(kafkaData1,
Schema.newBuilder()
.column("snowId","STRING")
.column("deviceType","STRING")
.column("createTime","TIMESTAMP_LTZ(3)")
.watermark("createTime",
"SOURCE_WATERMARK()")
.build());
blinkStreamTableEnv.createTemporaryView("InputTable",
inputTable);
// Table resultTable = blinkStreamTableEnv.sqlQuery("SELECT * FROM
InputTable");
// blinkStreamTableEnv.toRetractStream(resultTable,
BeanField.class).print("query==");
String querySQL0 = "select deviceType,count(1) from InputTable
where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5'
MINUTE),deviceType";
String querySQL1 = "select * from (select deviceType,count(1)
from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5'
MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where
deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType)
b";
String querySQL2 = "select deviceType,count(1) from InputTable
where deviceType='OSM' group by TUMBLE(createTime, INTERVAL '5'
MINUTE),deviceType ";
String querySQL3 = "select window_start,
window_end,deviceType,snowId from TABLE(\n" +
" TUMBLE(TABLE InputTable,
DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" +
" where deviceType='TDA' GROUP BY window_start,
window_end,deviceType,snowId";
String querySQL4 = "select b.* from (select deviceType,snowId
from TABLE(\n" +
"TUMBLE(TABLE InputTable,
DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" +
" where deviceType='TDA' GROUP BY window_start,
window_end,deviceType,snowId) a,(SELECT * FROM InputTable) b" +
" where a.snowId =b.snowId";
System.out.println(querySQL4);
Table table2 = blinkStreamTableEnv.sqlQuery(querySQL4);
blinkStreamTableEnv.toRetractStream(table2,
BeanField.class).print("query2==");
//
// String sql = "select * from (select deviceType,count(1) from
InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5'
MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where
deviceType='IDS' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType)
b,(select deviceType,count(1) from InputTable where deviceType='IDS' group by
TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) c";
// Table resultTableIps= blinkStreamTableEnv.sqlQuery(sql);
// blinkStreamTableEnv.toRetractStream(resultTableIps,
Row.class).print("queryIps==");
env.execute("Broadcast test kafka");
}
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before
> --------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24952
> URL: https://issues.apache.org/jira/browse/FLINK-24952
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.13.1
> Environment: public void test() throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(5000); //检查点 每5000ms
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> Properties browseProperties = new Properties();
> browseProperties.put("bootstrap.servers", "192.168.1.25:9093");
> browseProperties.put("group.id", "temporal");
> browseProperties.put("auto.offset.reset", "latest");
>
> PropTransformMap.getInstance().readConfigMap("./conf/cfg.properties");
> Map<String, String> configMap = new HashMap<String, String>();
> configMap.put(Constants.DB_JDBC_USER, "root");
> configMap.put(Constants.DB_JDBC_PASSWD, "1qazXSW@3edc");
> configMap.put(Constants.DB_JDBC_URL,
> "jdbc:mysql://192.168.1.25:3306/SSA?useUnicode=true&characterEncoding=utf-8");
> configMap.put(Constants.DB_JDBC_DRIVER,
> "com.mysql.jdbc.Driver");
> configMap.put(Constants.INITAL_POOL_SIZE, "10");
> configMap.put(Constants.MIN_POOL_SIZE, "5");
> configMap.put(Constants.MAX_IDLE_TIME, "50");
> configMap.put(Constants.MAX_STATE_ELEMENTS, "100");
> configMap.put(Constants.MAX_IDLE_TIME, "60");
> DbFetcher dbFetcher = new DbFetcher(configMap);
> List<String> listRule = RuleReader.readRules(dbFetcher);
> System.out.println("ListRule::" + listRule.size());
> final String RULE_SBROAD_CAST_STATE = "RulesBroadcastState";
> RuleParse ruleParse = new RuleParse();
> Map properties = new HashMap();
> ruleParse.parseData("./conf/cfg.json");
> //1、读取mysql的配置消息
> DataStream<List<String>> conf = env.addSource(new
> MysqlSourceFunction1(dbFetcher));
> //2、创建MapStateDescriptor规则,对广播的数据的数据类型的规则
> MapStateDescriptor<String, List<String>> ruleStateDescriptor =
> new MapStateDescriptor<>(RULE_SBROAD_CAST_STATE
> , BasicTypeInfo.STRING_TYPE_INFO
> , new ListTypeInfo<>(String.class));
> //3、对conf进行broadcast返回BroadcastStream
> final BroadcastStream<List<String>> confBroadcast =
> conf.broadcast(ruleStateDescriptor);
> //DataStream<String> dataStream =
> env.fromElements("{\"ORG_ID\":\"1\",\"RAW_MSG\":\"useradd,su -
> root\",\"EVENT_THREE_TYPE\":\"40001\",\"EVENT_TWO_TYPE\":\"40001\",\"SRC_PORT\":\"123\",\"DST_PORT\":\"124\",\"DST_IP\":\"10.16.254.11\",\"SRC_IP\":\"50.115.134.50\",\"CREATE_TIME\":\"2021-07-09
>
> 18:15:21.001\",\"DEVICE_PARENT_TYPE\":\"LINUX\",\"SNOW_ID\":\"85512\",\"EVENT_THREE_TYPE_DESC\":\"暴力破解失败\",\"ts\":\"2021-05-27
>
> 16:06:58\",\"ACCOUNT\":\"asap\",\"collectionName\":\"bwdOMS\",\"eRuleId\":\"0\",\"RULE_TJ_COUNT\":11,\"TAGS\":{\"EVENT_ONE_TYPE\":\"20000\",\"DIRECTION\":\"内部\",\"EVENT_TWO_TYPE\":\"10015\",\"EVENT_THREE_TYPE\":\"20101\"},\"DEVICE_TYPE\":\"OSM\",\"DIRECTION\":\"0\"}\n");
> DataStream<String> dataStream =
> env.fromElements("{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
>
> 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-26
>
> 17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
>
> 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
>
> 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}",
>
> "{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
>
> 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25
>
> 17:04:18.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
>
> 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
>
> 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}",
>
> "{\"DIRECTION\":\"0\",\"ATTACK_STAGE\":\"命令控制\",\"DEVICE_PARENT_TYPE\":\"IPS\",\"URL\":\"www.baidu.com\",\"SRC_PORT\":\"58513\",\"DST_PORT\":\"31177\",\"RISK_LEVEL\":\"99\",\"SRC_ASSET_TYPE\":\"4\",\"SRC_ASSET_SUB_TYPE\":\"412\",\"DST_ASSET_TYPE\":\"4\",\"DST_ASSET_SUB_TYPE\":\"412\",\"SRC_POST\":\"1\",\"DST_POST\":\"0\",\"INSERT_TIME\":\"2021-05-01
>
> 00:00:00.000\",\"DST_ASSET_NAME\":\"ddde\",\"SRC_ASSET_NAME\":\"wangwu\",\"SCENE_ID\":-5216633060008277343,\"SOURCE\":\"4\",\"ASSET_IP\":\"73.243.143.114\",\"TENANT_ID\":\"-1\",\"ORG_ID\":\"1\",\"DST_IP\":\"192.118.8.218\",\"EVENT_TYPE\":\"1008\",\"SRC_IP\":\"153.79.42.45\",\"CUSTOM_VALUE1\":\"187.36.226.184\",\"CUSTOM_VALUE2\":\"41.68.25.104\",\"SRC_PROVINCE\":\"日本\",\"SNOW_ID\":\"469260998\",\"DEVICE_TYPE\":\"TDA\",\"MESSAGE\":\"\",\"CHARACTER\":\"\",\"CUSTOM_LABEL1\":\"控制IP\",\"CREATE_TIME\":\"2021-04-25
>
> 17:04:38.000\",\"CUSTOM_LABEL2\":\"受控IP\",\"TYPE\":\"未知\",\"MALWARE_TYPE\":\"其他\",\"collectTime\":\"2021-04-29
>
> 19:40:36.000\",\"RULE_ID\":\"180607832\",\"DEVICE_IP\":\"239.150.69.203\",\"SRC_CITY\":\"日本\",\"recordTime\":\"2021-04-30
>
> 11:36:00.757\",\"equIP\":\"20.222.74.177\",\"DST_ZONE\":\"中国-湖北-武汉\",\"SERVERITY\":\"99\",\"SRC_COUNTRY\":\"日本\",\"SRC_ZONE\":\"意大利\",\"DST_COUNTRY\":\"中国\",\"DST_CITY\":\"广东\",\"DST_PROVINCE\":\"南京\",\"EVENT_ONE_TYPE\":\"-1\",\"EVENT_TWO_TYPE\":\"40001\",\"EVENT_NAME\":\"流氓软件\",\"EVENT_THREE_TYPE\":\"40001\",\"eRuleId\":\"0\",\"hitRuleIds\":\"1453556891900301314:Ordinary:A\"}");
> DataStream<StandardEvent> kafkaData = dataStream
> .map(new MapFunction<String, StandardEvent>() {
> @Override
> public StandardEvent map(String value)
> throws Exception {
> StandardEvent standardEvent =
> StandardEvent.parse(value);
>
> System.out.println("standardEvent:"+standardEvent.getField("RAW_MSG"));
> return standardEvent;
> }
> });
> DataStream<BeanField> kafkaData1 = kafkaData.map( new
> dealMapFunction(ruleParse));
> EnvironmentSettings blinkStreamSettings =
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment blinkStreamTableEnv =
> StreamTableEnvironment.create(env, blinkStreamSettings);
> Table inputTable =
> blinkStreamTableEnv.fromDataStream(kafkaData1,
> Schema.newBuilder()
> .column("snowId","STRING")
> .column("deviceType","STRING")
>
> .column("createTime","TIMESTAMP_LTZ(3)")
> .watermark("createTime",
> "SOURCE_WATERMARK()")
> .build());
> blinkStreamTableEnv.createTemporaryView("InputTable",
> inputTable);
> // Table resultTable = blinkStreamTableEnv.sqlQuery("SELECT * FROM
> InputTable");
> // blinkStreamTableEnv.toRetractStream(resultTable,
> BeanField.class).print("query==");
> String querySQL0 = "select deviceType,count(1) from InputTable
> where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5'
> MINUTE),deviceType";
> String querySQL1 = "select * from (select deviceType,count(1)
> from InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL
> '5' MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where
> deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType)
> b";
> String querySQL2 = "select deviceType,count(1) from InputTable
> where deviceType='OSM' group by TUMBLE(createTime, INTERVAL '5'
> MINUTE),deviceType ";
> String querySQL3 = "select window_start,
> window_end,deviceType,snowId from TABLE(\n" +
> " TUMBLE(TABLE InputTable,
> DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" +
> " where deviceType='TDA' GROUP BY window_start,
> window_end,deviceType,snowId";
> String querySQL4 = "select b.* from (select deviceType,snowId
> from TABLE(\n" +
> "TUMBLE(TABLE InputTable,
> DESCRIPTOR(createTime), INTERVAL '10' MINUTES))" +
> " where deviceType='TDA' GROUP BY window_start,
> window_end,deviceType,snowId) a,(SELECT * FROM InputTable) b" +
> " where a.snowId =b.snowId";
> System.out.println(querySQL4);
> Table table2 = blinkStreamTableEnv.sqlQuery(querySQL4);
> blinkStreamTableEnv.toRetractStream(table2,
> BeanField.class).print("query2==");
> //
> // String sql = "select * from (select deviceType,count(1) from
> InputTable where deviceType='TDA' group by TUMBLE(createTime, INTERVAL '5'
> MINUTE),deviceType) a,(select deviceType,count(1) from InputTable where
> deviceType='IDS' group by TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType)
> b,(select deviceType,count(1) from InputTable where deviceType='IDS' group by
> TUMBLE(createTime, INTERVAL '5' MINUTE),deviceType) c";
> // Table resultTableIps= blinkStreamTableEnv.sqlQuery(sql);
> // blinkStreamTableEnv.toRetractStream(resultTableIps,
> Row.class).print("queryIps==");
> env.execute("Broadcast test kafka");
> }
> Reporter: wangbaohua
> Priority: Major
>
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:82)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:517)
> at rete.ReteDemo4.test(ReteDemo4.java:478)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must
> not be in the input rows of a regular join. As a workaround you can cast the
> time attributes of input tables to TIMESTAMP before.
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:79)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
> ... 49 more
> Process finished with exit code -1
--
This message was sent by Atlassian Jira
(v8.20.1#820001)