[
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624290#comment-17624290
]
hongcha commented on FLINK-29674:
---------------------------------
[~martijnvisser] [~coderap] I looked at the part of the source code, I found
in, but in "KafkaSourceEnumerator. GetPartitionChange" method of
"dedupOrMarkAsRemoved" problems of the implementation of the object,
"removedPartitions" has not been added to "removedpartitions" after removing
the element. I have modified the judgement here and and is now normal
{code:java}
@VisibleForTesting
PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) {
final Set<TopicPartition> removedPartitions = new HashSet<>();
Consumer<TopicPartition> dedupOrMarkAsRemoved =
(tp) -> {
if (!fetchedPartitions.remove(tp)) {
removedPartitions.add(tp);
}
};
.... {code}
modified code
{code:java}
@VisibleForTesting
PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) {
final Set<TopicPartition> removedPartitions = new HashSet<>();
Consumer<TopicPartition> dedupOrMarkAsRemoved =
(tp) -> {
if (fetchedPartitions.remove(tp)) {
removedPartitions.add(tp);
}
};
....{code}
so this is bug?
> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
> Key: FLINK-29674
> URL: https://issues.apache.org/jira/browse/FLINK-29674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.2
> Reporter: hongcha
> Priority: Major
> Attachments: image-2022-10-18-20-38-34-515.png
>
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary
> (" setBounded ") 。when my job runs normally (with no fail), the bounds are
> valid, and my job will finish. However, when my job fails and I restore it to
> the checkpoint used during the failure, I find that my job cannot be
> completed normally and is always running. However, I can see in the log that
> data has been consumed to the boundary set by me. I don't know if there is a
> problem with my usage, here is part of my code:
>
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers("xxx:9092")
> .setProperties(properties)
> .setTopics(topicName)
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setBounded(OffsetsInitializer.offsets(offsets))
> .build(); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)