[
https://issues.apache.org/jira/browse/FLINK-16865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071418#comment-17071418
]
zhisheng commented on FLINK-16865:
----------------------------------
hi, [~wind_ljy] [~kangzai]
This is how I solve the problem for the time being
{code:java}
ExecutionConfig.GlobalJobParameters globalJobParameters =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String xxx = globalJobParameters.toMap().get("xxx");
if ("group".equals(xxx)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
} else if ("latest".equals(xxx)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
} else {
restoredState.put(partition,
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
{code}
If user want to consume the topic from latest, then he can set the config(eg:
xxx) in his job.
{code:java}
Configuration configuration = new Configuration();
configuration.setBoolean("xxx", "latest");
env.getConfig().setGlobalJobParameters(configuration);
{code}
this config can not affect the logic of the original Flink code.
I'm not sure whether this is a good way or not. I'm willing to hear your
opinions.
> 【Flink Kafka Connector】Restore from Savepoint,if add new Kafka topic,Flink
> will consume the new topic from earlist,it may cause duplicate data sink
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-16865
> URL: https://issues.apache.org/jira/browse/FLINK-16865
> Project: Flink
> Issue Type: Bug
> Reporter: zhisheng
> Priority: Major
> Attachments: image-2020-03-30-19-57-42-451.png
>
>
> h3. 【Flink Kafka Connector】
> If the job adds another Kafka topic when the job start from Savepoint, it
> will start to consume from the earlist of that topic, it may cause duplicate
> data sink.
> I found that the configuration is already written in the code
> FlinkKafkaConsumerBase#open(), maybe it can be Configurable.
>
> !image-2020-03-30-19-57-42-451.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)