lindong28 commented on a change in pull request #17342:
URL: https://github.com/apache/flink/pull/17342#discussion_r717524891
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
##########
@@ -73,7 +73,7 @@ public void commitOffsets(
if (offsetsToCommit.isEmpty()) {
return;
}
- SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher
= fetchers.get(0);
+ SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher
= getRunningFetcher();
Review comment:
Thanks for the review @fapaul. This change does not affect the fix. I
have updated the PR to remove this change.
Regarding the reason why this could improve performance, let's assume the
first fetcher created by this `KafkaSourceFetcherManager` has been closed and
removed from `fetchers`. Prior to this change, every time the `commitOffsets()`
is called, it will create a new `SplitFetcher` just to commit the offset. If
`commitOffsets()` is called N times, then N `SplitFetcher` will be created,
which seems to be really inefficient.
In order to fix this problem, we can commit the message using any running
fetcher in the `fetchers`, which could be achieved by using
`getRunningFetcher()` here.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
##########
@@ -73,7 +73,7 @@ public void commitOffsets(
if (offsetsToCommit.isEmpty()) {
return;
}
- SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher
= fetchers.get(0);
+ SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher
= getRunningFetcher();
Review comment:
I created https://issues.apache.org/jira/browse/FLINK-24398 to track
this issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]