This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e60ad938b fix commit kafka offset bug. (#3933)
e60ad938b is described below

commit e60ad938be6e43549fe36d9d9380a0193f91b9e8
Author: lightzhao <[email protected]>
AuthorDate: Mon Jan 16 21:33:05 2023 +0800

    fix commit kafka offset bug. (#3933)
    
    Co-authored-by: zhaoliang01 <[email protected]>
---
 .../seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 28e246d4b..d354c3d40 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -160,7 +160,7 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
     @Override
     public List<KafkaSourceSplit> snapshotState(long checkpointId) {
         checkpointOffsetMap.put(checkpointId, sourceSplits.stream()
-            .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, 
KafkaSourceSplit::getEndOffset)));
+            .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, 
KafkaSourceSplit::getStartOffset)));
         return 
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
     }
 

Reply via email to