[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hongcha updated FLINK-29674:
----------------------------
    Description: 
When I'm using the Kafka connector, and to set kafka's consumption boundary (" 
setBounded ") 。when my job runs normally (with no fail), the bounds are valid, 
and my job will finish. However, when my job fails and I restore it to the 
checkpoint used during the failure, I find that my job cannot be completed 
normally and is always running. However, I can see in the log that data has 
been consumed to the boundary set by me. I don't know if there is a problem 
with my usage, here is part of my code:

 
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("xxx:9092")
        .setProperties(properties)
        .setTopics(topicName)
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .setBounded(OffsetsInitializer.offsets(offsets))
        .build(); {code}
 

 

  was:
When I'm using the official Apache Kafka connector, I see that the official 
means to set kafka's consumption boundary (" setBounded ") are provided, and 
when my job runs normally (with no bugs), the bounds are valid, and my job will 
finish consuming bounded data normally. However, when my job fails and I 
restore it to the checkpoint used during the failure, I find that my job cannot 
be completed normally and is always running. However, I can see in the log that 
data has been consumed to the threshold set by me. I don't know if there is a 
problem with my usage, here is part of my code:

 
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("xxx:9092")
        .setProperties(properties)
        .setTopics(topicName)
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .setBounded(OffsetsInitializer.offsets(offsets))
        .build(); {code}
 

 


> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: hongcha
>            Priority: Major
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary 
> (" setBounded ") 。when my job runs normally (with no fail), the bounds are 
> valid, and my job will finish. However, when my job fails and I restore it to 
> the checkpoint used during the failure, I find that my job cannot be 
> completed normally and is always running. However, I can see in the log that 
> data has been consumed to the boundary set by me. I don't know if there is a 
> problem with my usage, here is part of my code:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to