YUJIANBO created FLINK-24224:
--------------------------------

             Summary: Table to stream, only the row datatype Stream works on 
CEP, and other POJOs, maps and jsonobjects datatype streams do not work, but 
any datatype stream to CEP can work by only stream api .
                 Key: FLINK-24224
                 URL: https://issues.apache.org/jira/browse/FLINK-24224
             Project: Flink
          Issue Type: Bug
          Components: Library / CEP
    Affects Versions: 1.13.2, 1.12.0, 1.11.2
            Reporter: YUJIANBO


 

1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, and 
other POJOs, maps and jsonobjects datatype streams do not work, but any 
datatype stream to CEP can work by only stream api .

2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

3、code:

(1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream 
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);

Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();

KeyedStream<JSONObject, String> ds = tableEnv
        .toAppendStream(tb, Row.class)
        .map(new RichMapFunction<Row, JSONObject>() {
            Map<String, Object> map = new HashMap<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                if (null == map) {
                    map = new HashMap<>();
                }
            }

            @Override
            public JSONObject map(Row value) throws Exception {
                //将数据key和value添加到map中
                RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
fieldNames, value);
                JSONObject jsonObject = 
JSONObject.parseObject(JSON.toJSONString(map));
                map.clear();
                return jsonObject;
            }
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("wStart") * 1000;
            }
        }).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();

Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
        .where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                log.info("===================>" + value);
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> match, Context ctx, 
Collector<String> out) throws Exception {
        out.collect("==========>>>>>>>" + match.toString());
    }
}).print();


{code}
(2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
        new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();

SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
        .map(x -> {
            return JSON.parseObject(x.value());
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
            @Override
            public long extractTimestamp(JSONObject element) {
                return element.getLongValue("ts");
            }
        })
        .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
        .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject, 
JSONObject, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<JSONObject> 
input, Collector<JSONObject> out) throws Exception {
                Iterator<JSONObject> iterator = input.iterator();
                ArrayList<JSONObject> list = new ArrayList<>();
                int n = 0;
                while (iterator.hasNext()) {
                    n++;
                    JSONObject next = iterator.next();
                    list.add(next);
                }
                JSONObject jsonObject = list.get(0);
                jsonObject.put("ct",n);
                jsonObject.remove("ts");
                out.collect(jsonObject);
            }
        });

input.print();

//it is ok
Pattern<JSONObject, JSONObject> minInterval = Pattern
        .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                return true;
            }
        }).timesOrMore(1).within(Time.seconds(10));

PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction<JSONObject, String>() {
    @Override
    public void processMatch(Map<String, List<JSONObject>> map, Context 
context, Collector<String> out) throws Exception {
        out.collect("这个用户有嫌疑:====================>" + map.toString());
    }
}).print();
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to