[ https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jiangwei updated FLINK-29674: ----------------------------- Description: 我在使用官方提供的Apache Kafka 连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码: {code:java} //代码占位符 String topicName = "jw-test-kafka-w-offset-002"; Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); offsets.put(new TopicPartition(topicName,0), 6L); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("xxx:9092") .setProperties(properties) .setTopics(topicName) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.offsets(offsets)) .build(); {code} was: 我在使用官方提供的Apache Kafka 连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码: {code:java} //代码占位符 String topicName = "jw-test-kafka-w-offset-002";Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();offsets.put(new TopicPartition(topicName,0), 6L); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("xxx:9092") .setProperties(properties) .setTopics(topicName) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.offsets(offsets)) .build(); {code} > Apache Kafka 连接器的“ setBounded”不生效 > --------------------------------- > > Key: FLINK-29674 > URL: https://issues.apache.org/jira/browse/FLINK-29674 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Reporter: jiangwei > Priority: Major > > 我在使用官方提供的Apache Kafka > 连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码: > > {code:java} > //代码占位符 > String topicName = "jw-test-kafka-w-offset-002"; > Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); > offsets.put(new TopicPartition(topicName,0), 6L); > KafkaSource<String> source = KafkaSource.<String>builder() > .setBootstrapServers("xxx:9092") > .setProperties(properties) > .setTopics(topicName) > .setGroupId("my-group") > .setStartingOffsets(OffsetsInitializer.earliest()) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setBounded(OffsetsInitializer.offsets(offsets)) > .build(); {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)