[
https://issues.apache.org/jira/browse/FLINK-36235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eason Ye updated FLINK-36235:
-----------------------------
Description:
Ignore emitting the null rowData when deserializing the message failed.
PulsarDeserializationSchemaWrapper.java:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws
Exception {
byte[] bytes = message.getData();
T instance = deserializationSchema.deserialize(bytes);
out.collect(instance);
} {code}
Change to:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws
Exception {
byte[] bytes = message.getData();
T instance = deserializationSchema.deserialize(bytes);
if (instance != null) {
out.collect(instance);
}
} {code}
was:
Ignore emitting the null rowData when deserializing the message failed.
PulsarDeserializationSchemaWrapper.java:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws
Exception {
byte[] bytes = message.getData();
T instance = deserializationSchema.deserialize(bytes); out.collect(instance);
} {code}
Change to:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws
Exception {
byte[] bytes = message.getData();
T instance = deserializationSchema.deserialize(bytes);
if (instance != null) {
out.collect(instance);
}
} {code}
> [Pulsar Connector] Ignore to emit the null rowData when deserialized message
> failed
> -----------------------------------------------------------------------------------
>
> Key: FLINK-36235
> URL: https://issues.apache.org/jira/browse/FLINK-36235
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Pulsar
> Affects Versions: pulsar-4.1.0
> Reporter: Eason Ye
> Priority: Major
> Fix For: pulsar-4.2.0
>
>
> Ignore emitting the null rowData when deserializing the message failed.
> PulsarDeserializationSchemaWrapper.java:
> {code:java}
> @Override
> public void deserialize(Message<byte[]> message, Collector<T> out) throws
> Exception {
> byte[] bytes = message.getData();
> T instance = deserializationSchema.deserialize(bytes);
> out.collect(instance);
> } {code}
>
> Change to:
>
> {code:java}
> @Override
> public void deserialize(Message<byte[]> message, Collector<T> out) throws
> Exception {
> byte[] bytes = message.getData();
> T instance = deserializationSchema.deserialize(bytes);
> if (instance != null) {
> out.collect(instance);
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)