[
https://issues.apache.org/jira/browse/FLINK-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
YUJIANBO updated FLINK-24224:
-----------------------------
Description:
1、problem:*Table change to stream for CEP* , only event is *ROW* datatype can
works on CEP, but other POJOs、maps、 JsonObject datatype event do not work.
Any datatype for event to CEP is OK by only stream api
2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
3、code:
(1)table to Stream to CEP (only row datatype is ok, other datatype Stream
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);
Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
KeyedStream<JSONObject, String> ds = tableEnv
.toAppendStream(tb, Row.class)
.map(new RichMapFunction<Row, JSONObject>() {
Map<String, Object> map = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (null == map) {
map = new HashMap<>();
}
}
@Override
public JSONObject map(Row value) throws Exception {
//将数据key和value添加到map中
RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes,
fieldNames, value);
JSONObject jsonObject =
JSONObject.parseObject(JSON.toJSONString(map));
map.clear();
return jsonObject;
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("wStart") * 1000;
}
}).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
.where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
log.info("===================>" + value);
return true;
}
}).timesOrMore(1).within(Time.seconds(10));
PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction<JSONObject, String>() {
@Override
public void processMatch(Map<String, List<JSONObject>> match, Context ctx,
Collector<String> out) throws Exception {
out.collect("==========>>>>>>>" + match.toString());
}
}).print();
{code}
(2) *Olny Stream API to CEP* ( Any datatype , it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();
SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
.map(x -> {
return JSON.parseObject(x.value());
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("ts");
}
})
.keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
.timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject,
JSONObject, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<JSONObject>
input, Collector<JSONObject> out) throws Exception {
Iterator<JSONObject> iterator = input.iterator();
ArrayList<JSONObject> list = new ArrayList<>();
int n = 0;
while (iterator.hasNext()) {
n++;
JSONObject next = iterator.next();
list.add(next);
}
JSONObject jsonObject = list.get(0);
jsonObject.put("ct",n);
jsonObject.remove("ts");
out.collect(jsonObject);
}
});
input.print();
//it is ok
Pattern<JSONObject, JSONObject> minInterval = Pattern
.<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return true;
}
}).timesOrMore(1).within(Time.seconds(10));
PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction<JSONObject, String>() {
@Override
public void processMatch(Map<String, List<JSONObject>> map, Context
context, Collector<String> out) throws Exception {
out.collect("这个用户有嫌疑:====================>" + map.toString());
}
}).print();
{code}
was:
1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, and
other POJOs, maps and jsonobjects datatype streams do not work, but any
datatype stream to CEP can work by only stream api .
2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
3、code:
(1)table to Stream to CEP (only row datatype is ok, other datatype Stream
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);
Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
KeyedStream<JSONObject, String> ds = tableEnv
.toAppendStream(tb, Row.class)
.map(new RichMapFunction<Row, JSONObject>() {
Map<String, Object> map = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (null == map) {
map = new HashMap<>();
}
}
@Override
public JSONObject map(Row value) throws Exception {
//将数据key和value添加到map中
RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes,
fieldNames, value);
JSONObject jsonObject =
JSONObject.parseObject(JSON.toJSONString(map));
map.clear();
return jsonObject;
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("wStart") * 1000;
}
}).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
.where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
log.info("===================>" + value);
return true;
}
}).timesOrMore(1).within(Time.seconds(10));
PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction<JSONObject, String>() {
@Override
public void processMatch(Map<String, List<JSONObject>> match, Context ctx,
Collector<String> out) throws Exception {
out.collect("==========>>>>>>>" + match.toString());
}
}).print();
{code}
(2) *Olny Stream API to CEP* ( Any datatype , it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();
SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
.map(x -> {
return JSON.parseObject(x.value());
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("ts");
}
})
.keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
.timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject,
JSONObject, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<JSONObject>
input, Collector<JSONObject> out) throws Exception {
Iterator<JSONObject> iterator = input.iterator();
ArrayList<JSONObject> list = new ArrayList<>();
int n = 0;
while (iterator.hasNext()) {
n++;
JSONObject next = iterator.next();
list.add(next);
}
JSONObject jsonObject = list.get(0);
jsonObject.put("ct",n);
jsonObject.remove("ts");
out.collect(jsonObject);
}
});
input.print();
//it is ok
Pattern<JSONObject, JSONObject> minInterval = Pattern
.<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return true;
}
}).timesOrMore(1).within(Time.seconds(10));
PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction<JSONObject, String>() {
@Override
public void processMatch(Map<String, List<JSONObject>> map, Context
context, Collector<String> out) throws Exception {
out.collect("这个用户有嫌疑:====================>" + map.toString());
}
}).print();
{code}
> Table change to stream for CEP , only the row datatype Stream works on CEP,
> and other POJOs、maps、 JsonObject datatype streams do not work, but any
> datatype stream for CEP can work by only stream api .
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24224
> URL: https://issues.apache.org/jira/browse/FLINK-24224
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.11.2, 1.12.0, 1.13.2
> Reporter: YUJIANBO
> Priority: Major
>
>
> 1、problem:*Table change to stream for CEP* , only event is *ROW* datatype
> can works on CEP, but other POJOs、maps、 JsonObject datatype event do not
> work.
> Any datatype for event to CEP is OK by only stream
> api
> 2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2
> 3、code:
> (1)table to Stream to CEP (only row datatype is ok, other datatype
> Stream to CEP has no data print and it has no error message)
> {code:java}
> tableEnv.executeSql(creat_kafka_source);
> tableEnv.executeSql(calculateSql);
> Table tb = tableEnv.from("calculateSql");
> String[] fieldNames = tb.getSchema().getFieldNames();
> DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();
> KeyedStream<JSONObject, String> ds = tableEnv
> .toAppendStream(tb, Row.class)
> .map(new RichMapFunction<Row, JSONObject>() {
> Map<String, Object> map = new HashMap<>();
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> if (null == map) {
> map = new HashMap<>();
> }
> }
> @Override
> public JSONObject map(Row value) throws Exception {
> //将数据key和value添加到map中
> RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes,
> fieldNames, value);
> JSONObject jsonObject =
> JSONObject.parseObject(JSON.toJSONString(map));
> map.clear();
> return jsonObject;
> }
> })
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(JSONObject element) {
> return element.getLongValue("wStart") * 1000;
> }
> }).keyBy(x -> x.getString("x_forwarded_for"));
> //it has data to print
> ds.print();
> Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("begin")
> .where(new SimpleCondition<JSONObject>() {
> @Override
> public boolean filter(JSONObject value) throws Exception {
> log.info("===================>" + value);
> return true;
> }
> }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> patternStream = CEP.pattern(ds, pattern);
> //it has no data to print
> patternStream.process(new PatternProcessFunction<JSONObject, String>() {
> @Override
> public void processMatch(Map<String, List<JSONObject>> match, Context
> ctx, Collector<String> out) throws Exception {
> out.collect("==========>>>>>>>" + match.toString());
> }
> }).print();
> {code}
> (2) *Olny Stream API to CEP* ( Any datatype , it is OK)
> {code:java}
> Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
> FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
> new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(),
> proPs);
> consumer.setStartFromEarliest();
> SingleOutputStreamOperator<JSONObject> input = env.addSource(consumer)
> .map(x -> {
> return JSON.parseObject(x.value());
> })
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.milliseconds(10)) {
> @Override
> public long extractTimestamp(JSONObject element) {
> return element.getLongValue("ts");
> }
> })
> .keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
> .timeWindow(Time.seconds(1)).apply(new WindowFunction<JSONObject,
> JSONObject, String, TimeWindow>() {
> @Override
> public void apply(String s, TimeWindow window,
> Iterable<JSONObject> input, Collector<JSONObject> out) throws Exception {
> Iterator<JSONObject> iterator = input.iterator();
> ArrayList<JSONObject> list = new ArrayList<>();
> int n = 0;
> while (iterator.hasNext()) {
> n++;
> JSONObject next = iterator.next();
> list.add(next);
> }
> JSONObject jsonObject = list.get(0);
> jsonObject.put("ct",n);
> jsonObject.remove("ts");
> out.collect(jsonObject);
> }
> });
> input.print();
> //it is ok
> Pattern<JSONObject, JSONObject> minInterval = Pattern
> .<JSONObject>begin("begin").where(new SimpleCondition<JSONObject>() {
> @Override
> public boolean filter(JSONObject jsonObject) throws Exception {
> return true;
> }
> }).timesOrMore(1).within(Time.seconds(10));
> PatternStream<JSONObject> pattern = CEP.pattern(input, minInterval);
> pattern.process(new PatternProcessFunction<JSONObject, String>() {
> @Override
> public void processMatch(Map<String, List<JSONObject>> map, Context
> context, Collector<String> out) throws Exception {
> out.collect("这个用户有嫌疑:====================>" + map.toString());
> }
> }).print();
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)