[ 
https://issues.apache.org/jira/browse/FLINK-36235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eason Ye updated FLINK-36235:
-----------------------------
    Description: 
Ignore emitting the null rowData when deserializing the message failed.
PulsarDeserializationSchemaWrapper.java:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws 
Exception {
  byte[] bytes = message.getData();
  T instance = deserializationSchema.deserialize(bytes);
  out.collect(instance);
} {code}
 

Change to: 
 
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws 
Exception {
  byte[] bytes = message.getData();
  T instance = deserializationSchema.deserialize(bytes);
  if (instance != null) { 
    out.collect(instance);
  }
} {code}

  was:
Ignore emitting the null rowData when deserializing the message failed.
PulsarDeserializationSchemaWrapper.java:
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws 
Exception {
  byte[] bytes = message.getData();
  T instance = deserializationSchema.deserialize(bytes);  out.collect(instance);
} {code}
 

Change to: 
 
{code:java}
@Override
public void deserialize(Message<byte[]> message, Collector<T> out) throws 
Exception {
  byte[] bytes = message.getData();
  T instance = deserializationSchema.deserialize(bytes);
  if (instance != null) { 
    out.collect(instance);
  }
} {code}


> [Pulsar Connector] Ignore to emit the null rowData when deserialized message 
> failed
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-36235
>                 URL: https://issues.apache.org/jira/browse/FLINK-36235
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Pulsar
>    Affects Versions: pulsar-4.1.0
>            Reporter: Eason Ye
>            Priority: Major
>             Fix For: pulsar-4.2.0
>
>
> Ignore emitting the null rowData when deserializing the message failed.
> PulsarDeserializationSchemaWrapper.java:
> {code:java}
> @Override
> public void deserialize(Message<byte[]> message, Collector<T> out) throws 
> Exception {
>   byte[] bytes = message.getData();
>   T instance = deserializationSchema.deserialize(bytes);
>   out.collect(instance);
> } {code}
>  
> Change to: 
>  
> {code:java}
> @Override
> public void deserialize(Message<byte[]> message, Collector<T> out) throws 
> Exception {
>   byte[] bytes = message.getData();
>   T instance = deserializationSchema.deserialize(bytes);
>   if (instance != null) { 
>     out.collect(instance);
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to