zhangshenghang opened a new pull request, #6961:
URL: https://github.com/apache/seatunnel/pull/6961
### Purpose of this pull request
Kafka consumer group automatically commits offset logic error fix.
When Kafka Source Options sets `commit_on_checkpoint` to `true`, Checkpoint
is enabled, and Kafka Offset is manually committed in KafkaSourceReader. At
this time, the Kafka consumer group should be set to manually commit Offset,
but now the `initConsumer` method in the `KafkaConsumerThread` class is
automatically committed
`props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));`
Similarly, when `commit_on_checkpoint` is set to false,
`props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));` will also be set to false. At this time, Offset
is neither manually committed nor automatically committed
I think this is a Bug. I have fixed it.
KafkaSourceReader.java
```java
public void notifyCheckpointComplete(long checkpointId) {
if (!checkpointOffsetMap.containsKey(checkpointId)) {
log.warn("checkpoint {} do not exist or have already been
committed.", checkpointId);
} else {
checkpointOffsetMap
.remove(checkpointId)
.forEach(
(topicPartition, offset) -> {
try {
consumerThreadMap
.get(topicPartition)
.getTasks()
.put(
consumer -> {
if (kafkaSourceConfig
.isCommitOnCheckpoint()) {
Map<TopicPartition, OffsetAndMetadata>
offsets
= new HashMap<>();
if (offset >= 0)
{
offsets.put(
topicPartition,
new
OffsetAndMetadata(
offset));
consumer.commitSync(offsets);
}
}
});
} catch (InterruptedException e) {
log.error("commit offset to kafka
failed", e);
}
});
}
}
```
KafkaConsumerThread.java
```java
...
public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig,
ConsumerMetadata metadata) {
this.metadata = metadata;
this.tasks = new LinkedBlockingQueue<>();
this.consumer =
initConsumer(
kafkaSourceConfig.getBootstrap(),
metadata.getConsumerGroup(),
kafkaSourceConfig.getProperties(),
!kafkaSourceConfig.isCommitOnCheckpoint());
}
...
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));
...
```
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update change log that in connector document. For more details you can
refer to
[connector-v2](https://github.com/apache/seatunnel/tree/dev/docs/en/connector-v2)
2. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
3. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
* [ ] Update the
[`release-note`](https://github.com/apache/seatunnel/blob/dev/release-note.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]