[
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
hongcha updated FLINK-29674:
----------------------------
Description:
When I'm using the Kafka connector, and to set kafka's consumption boundary ("
setBounded ") 。when my job runs normally (with no fail), the bounds are valid,
and my job will finish. However, when my job fails and I restore it to the
checkpoint used during the failure, I find that my job cannot be completed
normally and is always running. However, I can see in the log that data has
been consumed to the boundary set by me. I don't know if there is a problem
with my usage, here is part of my code:
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("xxx:9092")
.setProperties(properties)
.setTopics(topicName)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.offsets(offsets))
.build(); {code}
was:
When I'm using the official Apache Kafka connector, I see that the official
means to set kafka's consumption boundary (" setBounded ") are provided, and
when my job runs normally (with no bugs), the bounds are valid, and my job will
finish consuming bounded data normally. However, when my job fails and I
restore it to the checkpoint used during the failure, I find that my job cannot
be completed normally and is always running. However, I can see in the log that
data has been consumed to the threshold set by me. I don't know if there is a
problem with my usage, here is part of my code:
{code:java}
//代码占位符
String topicName = "jw-test-kafka-w-offset-002";
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
offsets.put(new TopicPartition(topicName,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("xxx:9092")
.setProperties(properties)
.setTopics(topicName)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.offsets(offsets))
.build(); {code}
> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
> Key: FLINK-29674
> URL: https://issues.apache.org/jira/browse/FLINK-29674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: hongcha
> Priority: Major
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary
> (" setBounded ") 。when my job runs normally (with no fail), the bounds are
> valid, and my job will finish. However, when my job fails and I restore it to
> the checkpoint used during the failure, I find that my job cannot be
> completed normally and is always running. However, I can see in the log that
> data has been consumed to the boundary set by me. I don't know if there is a
> problem with my usage, here is part of my code:
>
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers("xxx:9092")
> .setProperties(properties)
> .setTopics(topicName)
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setBounded(OffsetsInitializer.offsets(offsets))
> .build(); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)