[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangwei updated FLINK-29674:
-----------------------------
    Description: 
我在使用官方提供的Apache Kafka 
连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码:

 
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("xxx:9092")
        .setProperties(properties)
        .setTopics(topicName)
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .setBounded(OffsetsInitializer.offsets(offsets))
        .build(); {code}
 

 

  was:
我在使用官方提供的Apache Kafka 
连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码:

 
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";Map<TopicPartition, Long> 
offsets = new HashMap<TopicPartition, Long>();offsets.put(new 
TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()        
.setBootstrapServers("xxx:9092")        .setProperties(properties)        
.setTopics(topicName)        .setGroupId("my-group")        
.setStartingOffsets(OffsetsInitializer.earliest())        
.setValueOnlyDeserializer(new SimpleStringSchema())        
.setBounded(OffsetsInitializer.offsets(offsets))        .build(); {code}
 

 


> Apache Kafka 连接器的“ setBounded”不生效
> ---------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: jiangwei
>            Priority: Major
>
> 我在使用官方提供的Apache Kafka 
> 连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to