[
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jiangwei updated FLINK-29674:
-----------------------------
Description:
When I'm using the official Apache Kafka connector, I see that the official
means to set kafka's consumption boundary (" setBounded ") are provided, and
when my job runs normally (with no bugs), the bounds are valid, and my job will
finish consuming bounded data normally. However, when my job fails and I
restore it to the checkpoint used during the failure, I find that my job cannot
be completed normally and is always running. However, I can see in the log that
data has been consumed to the threshold set by me. I don't know if there is a
problem with my usage, here is part of my code:
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("xxx:9092")
.setProperties(properties)
.setTopics(topicName)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.offsets(offsets))
.build(); {code}
was:
我在使用官方提供的Apache Kafka
连接器时,我看到官方有提供设置kafka的消费边界的方法(setBounded),当我的job正常执行时(无故障),我设置的边界是有效的,我的job在消费到边界数据时能正常完成,但是当我的job出现故障时,我使用故障时的checkpoint进行重新恢复job时,发现我的job无法正常完成,一直处于running,但是我看日志能看到数据是已经消费到我设置的边界点。我不知道是不是我的用法有问题,以下是我的部分代码:
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("xxx:9092")
.setProperties(properties)
.setTopics(topicName)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.offsets(offsets))
.build(); {code}
> Apache Kafka 连接器的“ setBounded”不生效
> ---------------------------------
>
> Key: FLINK-29674
> URL: https://issues.apache.org/jira/browse/FLINK-29674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: jiangwei
> Priority: Major
>
> When I'm using the official Apache Kafka connector, I see that the official
> means to set kafka's consumption boundary (" setBounded ") are provided, and
> when my job runs normally (with no bugs), the bounds are valid, and my job
> will finish consuming bounded data normally. However, when my job fails and I
> restore it to the checkpoint used during the failure, I find that my job
> cannot be completed normally and is always running. However, I can see in the
> log that data has been consumed to the threshold set by me. I don't know if
> there is a problem with my usage, here is part of my code:
>
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers("xxx:9092")
> .setProperties(properties)
> .setTopics(topicName)
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setBounded(OffsetsInitializer.offsets(offsets))
> .build(); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)