[
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-30935:
----------------------------
Description:
Current kafka many implemented serializers do not deal with version check while
other implementations of SimpleVersionedSerializer supports it.
we can add it like many other connectors's implementation in case of
incompatible or corrupt state when restoring from checkpoint.
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}
@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}
@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I
think we can add it like many other connectors's implementation in case of
incompatible or corrupt state when restoring from checkpoint.
> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --------------------------------------------------------------------------
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Ran Tao
> Priority: Major
> Labels: pull-request-available
>
> Current kafka many implemented serializers do not deal with version check
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of
> incompatible or corrupt state when restoring from checkpoint.
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)