loserwang1024 opened a new pull request, #28: URL: https://github.com/apache/flink-connector-kafka/pull/28
### What is the purpose of the change As described in [[FLIP-288](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source), the strategy used for new partitions is the same as the initial offset strategy, which is not reasonable. According to the semantics, if the startup strategy is latest, the consumed data should include all data from the moment of startup, which also includes all messages from new created partitions. However, the latest strategy currently maybe used for new partitions, leading to the loss of some data (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).if the data from all new partitions is not read, it does not meet the user's expectations. Other ploblems see final Section: `User specifies OffsetsInitializer for new partition` . Therefore, it’s better to provide an **EARLIEST** strategy for later discovered partitions. ### Brief change log 1. Expand `KafkaSourceEnumState` with `TopicPartitionWithAssignStatus` to distinguish between initial partitions and newly discovered partitions. `TopicPartitionWithAssignStatus` is also better for future expansion, as new statuses can be added without changing the state results. 2. Add a `newDiscoveryOffsetsInitializer`(EARLIEST) to get offsets for newly discovered partitions. 3. Modify `kafkaSourceEnumStateSerializer` to handle the expanded `KafkaSourceEnumState`. ### Verifying this change 1. Test the backward compatibility of state when deserializing in `KafkaSourceEnumStateSerializerTest`. 2. Expand `KafkaEnumeratorTest#testSnapshotState` method to test snapshot state in more scenarios: 1. Before first discovery, so the state should be empty 2. First partition discovery after start, but no assignments to readers 3. Assign partials partitions to readers 4. Assign all partitions to readers 3. Expand `KafkaEnumeratorTest#testDiscoverPartitionsPeriodically` method to test whether new partitions use EARLIEST offset while initial partitions use specified offset strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org