Zekun Yu created FLINK-19332:
--------------------------------
Summary: Special characters issue using Kinesis Data Analytics for
Apache Flink
Key: FLINK-19332
URL: https://issues.apache.org/jira/browse/FLINK-19332
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: 1.8.2
Reporter: Zekun Yu
Fix For: 1.8.2
Hi there,
I am encountering one special character issue while using Kinesis Data
Analytics for Apache Flink (KDA).
Our KDA is built for processing data and outputting to a Kinesis stream. We
have a lambda function that subscribes to the Kinesis stream and reads records
from the Kinesis stream.
The library in the KDA I am using is
"org.apache.flink.streaming.connectors.kinesis".
Our KDA is only outputting one single record to the Kinesis sink using
"collector.collect()" for a single key (details will be found below)
Most times, the record received by the Lambda looks perfectly good.
However, occasionally, when two records are sent to the kinesis sink using
"collector.collect()" at the same time, *we noticed that those two records are
combined somehow and there are some special characters in the record received
by the Lambda function*.
Below are some technical details:
The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys.
).returns(MatchedDataForAlarm.class)
.keyBy(MatchedDataForAlarm::getStateKey)
.connect(ruleBroadcastStream)
.process(new MetricProcess())
.addSink(kinesis);
The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides
the "processElement" function. It uses collector.collect() for outputs.
@Override
public void processElement(MatchedDataForAlarm input, ReadOnlyContext
ctx,Collector<MatchedDataForAlarm> collector) throws Exception {
We have our own AEMMatchedDataForAlarmSchemaSerialization which implements
KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply
converts a "MatchedDataForAlarm" object to String using Gson and then converts
to ByteBuffer.
@Override
public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) {
Gson gson = new Gson();
String result = gson.toJson(matchedDataForAlarm);
_log_.info("Alarm record sent to Kinesis stream: {}", result);
return ByteBuffer._wrap_(result.getBytes());
}
*Here's the record shown in the Lambda logs when two records are combined
somewhere somehow (most cases those two are received as two separate records):*
????
0??
{ "inAlarmState": false }
??
{ "inAlarmState": false}
e????E????o?N9x
I am not sure if it's a serialization issue or some default behaviors in the
Kinesis sink library? It might be just some common mistakes that I made which I
am not aware of.
Could anyone help with this problem? I really appreciate it.
Thanks,
Zekun
--
This message was sent by Atlassian Jira
(v8.3.4#803005)