[
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruibin Xing updated FLINK-31483:
--------------------------------
Description:
Currently, the Flink Kafka Connector does not support split deletion and is
left as a
[TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
I want to add this feature by doing these steps:
1. Add SplitsDeletion event to flink-connector-base, which currently only has
SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a
SplitsDeletion event to the source operator. To maintain compatibility, a
default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the
SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including
remove them from Split state and stopping SourceReader from reading the deleted
splits.
As an alternative, without modifying the flink-connector-base,
KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for
splits deletion and deal with it in the kafka-connector-specific code. But I
think it's better to have SplitsDeletion in flink-connector-base, so other
connectors can use it too.
Let me know if you have any thoughts or ideas. Thanks!
Related Issues: FLINK-30490
was:
Currently, the Flink Kafka Connector does not support split deletion and is
left as a
[TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
I want to add this feature by doing these steps:
1. Add SplitsDeletion event to flink-connector-base, which currently only has
SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a
SplitsDeletion event to the source operator. To maintain compatibility, a
default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the
SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including
remove them from Split state and stopping SourceReader from reading the deleted
splits.
As an alternative, without modifying the flink-connector-base,
KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for
splits deletion and deal with it in the kafka-connector-specific code. But I
think it's better to have SplitsDeletion in flink-connector-base, so other
connectors can use it too.
Let me know if you have any thoughts or ideas. Thanks!
Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490]
> Implement Split Deletion Support in Flink Kafka Connector
> ---------------------------------------------------------
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / Kafka, Connectors / Parent
> Reporter: Ruibin Xing
> Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is
> left as a
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
> I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a
> SplitsDeletion event to the source operator. To maintain compatibility, a
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including
> remove them from Split state and stopping SourceReader from reading the
> deleted splits.
> As an alternative, without modifying the flink-connector-base,
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for
> splits deletion and deal with it in the kafka-connector-specific code. But I
> think it's better to have SplitsDeletion in flink-connector-base, so other
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490
--
This message was sent by Atlassian Jira
(v8.20.10#820010)