This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e60ad938b fix commit kafka offset bug. (#3933)
e60ad938b is described below
commit e60ad938be6e43549fe36d9d9380a0193f91b9e8
Author: lightzhao <[email protected]>
AuthorDate: Mon Jan 16 21:33:05 2023 +0800
fix commit kafka offset bug. (#3933)
Co-authored-by: zhaoliang01 <[email protected]>
---
.../seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 28e246d4b..d354c3d40 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -160,7 +160,7 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
@Override
public List<KafkaSourceSplit> snapshotState(long checkpointId) {
checkpointOffsetMap.put(checkpointId, sourceSplits.stream()
- .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition,
KafkaSourceSplit::getEndOffset)));
+ .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition,
KafkaSourceSplit::getStartOffset)));
return
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
}