[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hongcha updated FLINK-29674:
----------------------------
    Summary: Apache Kafka Connector‘s “ setBounded” not valid  (was: Apache 
Kafka 连接器的“ setBounded”不生效)

> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: hongcha
>            Priority: Major
>
> When I'm using the official Apache Kafka connector, I see that the official 
> means to set kafka's consumption boundary (" setBounded ") are provided, and 
> when my job runs normally (with no bugs), the bounds are valid, and my job 
> will finish consuming bounded data normally. However, when my job fails and I 
> restore it to the checkpoint used during the failure, I find that my job 
> cannot be completed normally and is always running. However, I can see in the 
> log that data has been consumed to the threshold set by me. I don't know if 
> there is a problem with my usage, here is part of my code:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to