[ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:
----------------------------
    Description: 
Current kafka many implemented serializers do not deal with version check while 
other implementations of SimpleVersionedSerializer supports it.

we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint.

 
{code:java}
@Override
public int getVersion() {
    return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
    try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
            DataInputStream in = new DataInputStream(bais)) {
        String topic = in.readUTF();
        int partition = in.readInt();
        long offset = in.readLong();
        long stoppingOffset = in.readLong();
        return new KafkaPartitionSplit(
                new TopicPartition(topic, partition), offset, stoppingOffset);
    }
} {code}
 

 

  was:
{code:java}
@Override
public int getVersion() {
    return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
    try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
            DataInputStream in = new DataInputStream(bais)) {
        String topic = in.readUTF();
        int partition = in.readInt();
        long offset = in.readLong();
        long stoppingOffset = in.readLong();
        return new KafkaPartitionSplit(
                new TopicPartition(topic, partition), offset, stoppingOffset);
    }
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --------------------------------------------------------------------------
>
>                 Key: FLINK-30935
>                 URL: https://issues.apache.org/jira/browse/FLINK-30935
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Ran Tao
>            Priority: Major
>              Labels: pull-request-available
>
> Current kafka many implemented serializers do not deal with version check 
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.
>  
> {code:java}
> @Override
> public int getVersion() {
>     return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
>     try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
>             DataInputStream in = new DataInputStream(bais)) {
>         String topic = in.readUTF();
>         int partition = in.readInt();
>         long offset = in.readLong();
>         long stoppingOffset = in.readLong();
>         return new KafkaPartitionSplit(
>                 new TopicPartition(topic, partition), offset, stoppingOffset);
>     }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to