[
https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412366#comment-17412366
]
YUJIANBO commented on FLINK-24224:
----------------------------------
[~dwysakowicz]:
hello, I am very eager to know if this is a bug. Do you have any
ideas to help me solve it.
Thank you very much~
> Table to stream, only the row datatype Stream works on CEP, and other POJOs,
> maps and jsonobjects datatype streams do not work, but any datatype stream to
> CEP can work by only stream api .
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24224
> URL: https://issues.apache.org/jira/browse/FLINK-24224
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.11.2, 1.12.0, 1.13.2
> Reporter: YUJIANBO
> Priority: Major
>
>
> 1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*,
> and other POJOs, maps and jsonobjects datatype streams do not work, but any
> datatype stream to CEP can work by only stream api .
> 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
> 3、code:
> (1)table to Stream to CEP (only row datatype is ok, other datatype
> Stream to CEP has no data print and it has no error message)
> {code:java}
> tableEnv.executeSql(creat_kafka_source);
> tableEnv.executeSql(calculateSql);
> Table tb = tableEnv.from("calculateSql");
> String[] fieldNames = tb.getSchema().getFieldNames();
> DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
> KeyedStream<JSONObject, String> ds = tableEnv
> .toAppendStream(tb, Row.class)
> .map(new RichMapFunction<Row, JSONObject>() {
> Map<String, Object> map = new HashMap<>();
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> if (null == map) {
> map = new HashMap<>();
> }
> }
> @Override
> public JSONObject map(Row value) throws Exception {
> //将数据key和value添加到map中
> RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes,
> fieldNames, value);
> JSONObject jsonObject =
> JSONObject.parseObject(JSON.toJSONString(map));
> map.clear();
> return jsonObject;
> }
> })
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(JSONObject element) {
> return element.getLongValue("wStart") * 1000;
> }
> }).keyBy(x -> x.getString("x_forwarded_for"));
> //it has data to print
> ds.print();
> Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
> .where(new SimpleCondition<JSONObject>() {
> @Override
> public boolean filter(JSONObject value) throws Exception {
> log.info("===================>" + value);
> return true;
> }
> }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
> //it has no data to print
> patternStream.process(new PatternProcessFunction<JSONObject, String>() {
> @Override
> public void processMatch(Map<String, List<JSONObject>> match, Context
> ctx, Collector<String> out) throws Exception {
> out.collect("==========>>>>>>>" + match.toString());
> }
> }).print();
> {code}
> (2) *Olny Stream API to CEP* ( Any datatype , it is OK)
> {code:java}
> Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
> FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
> new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(),
> proPs);
> consumer.setStartFromEarliest();
> SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
> .map(x -> {
> return JSON.parseObject(x.value());
> })
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
> @Override
> public long extractTimestamp(JSONObject element) {
> return element.getLongValue("ts");
> }
> })
> .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
> .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject,
> JSONObject, String, TimeWindow>() {
> @Override
> public void apply(String s, TimeWindow window,
> Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
> Iterator<JSONObject> iterator = input.iterator();
> ArrayList<JSONObject> list = new ArrayList<>();
> int n = 0;
> while (iterator.hasNext()) {
> n++;
> JSONObject next = iterator.next();
> list.add(next);
> }
> JSONObject jsonObject = list.get(0);
> jsonObject.put("ct",n);
> jsonObject.remove("ts");
> out.collect(jsonObject);
> }
> });
> input.print();
> //it is ok
> Pattern<JSONObject, JSONObject> minInterval = Pattern
> .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
> @Override
> public boolean filter(JSONObject jsonObject) throws Exception {
> return true;
> }
> }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
> pattern.process(new PatternProcessFunction<JSONObject, String>() {
> @Override
> public void processMatch(Map<String, List<JSONObject>> map, Context
> context, Collector<String> out) throws Exception {
> out.collect("这个用户有嫌疑:====================>" + map.toString());
> }
> }).print();
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)