[
https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889886#comment-16889886
]
Jiangjie Qin commented on FLINK-10806:
--------------------------------------
[~wind_ljy] Thanks for the context. Yes, I agree that the use case you
mentioned is a pretty reasonable use case. And the fact currently the newly
discovered partitions (they may or may not belong to a new topic) are set to
{{EARLIEST_OFFSET}} is a bad behavior.
What I am wondering is whether you only need to set the consumer offset to the
latest for newly discovered partitions, or you may need to set the offset to
some specific offset. This may impact the delivery guarantee.
For example, consider the following event sequence:
# Initial state: Pipeline A is processing Topic T1. Pipeline B is processing
Topic T2. For simplicity, let's assume topic T1 and T2 both only have 1
partition (P0).
# Stop pipeline A and pipeline B. Reassign both topic 1 and topic 2 to
Pipeline B.
# The upstream producers produced a few more messages into T1.
# Restart pipeline B to consume from both T1 and T2.
In the above case, after restart pipeline B, for T2 the consumption will start
from the last committed offset, which is probably what we expect. However, for
T1, if the consumption starts from the latest, the messages produced in step 3
will be lost. Is that behavior expected? Do you want to set the starting offset
to where pipeline A left in this case?
It is quite easy to just always allow users to configure the consumer to either
start from earliest or latest for the new partitions if the group offset is
missing. But allowing a specific starting offset for a newly discovered topic
needs a few more thoughts.
> Support multiple consuming offsets when discovering a new topic
> ---------------------------------------------------------------
>
> Key: FLINK-10806
> URL: https://issues.apache.org/jira/browse/FLINK-10806
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.6.2, 1.8.1
> Reporter: Jiayi Liao
> Assignee: Jiayi Liao
> Priority: Major
>
> In KafkaConsumerBase, we discover the TopicPartitions and compare them with
> the restoredState. It's reasonable when a topic's partitions scaled. However,
> if we add a new topic which has too much data and restore the Flink program,
> the data of the new topic will be consumed from the start, which may not be
> what we want. I think this should be an option for developers.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)