This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3da19d444 [Improve][Connector-V2-kafka] Support setting read starting 
offset or time at startup config (#3157)
3da19d444 is described below

commit 3da19d444408b4a8fc71d4c73a39d2754af467ca
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Nov 8 19:17:22 2022 +0800

    [Improve][Connector-V2-kafka] Support setting read starting offset or time 
at startup config (#3157)
---
 docs/en/connector-v2/source/kafka.md               |  25 ++++-
 .../connectors/seatunnel/kafka/config/Config.java  |  15 +++
 .../seatunnel/kafka/config/StartMode.java          |  46 ++++++++
 .../seatunnel/kafka/source/ConsumerMetadata.java   |  32 ++++++
 .../seatunnel/kafka/source/KafkaSource.java        |  56 +++++++++-
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 121 ++++++++++++++++-----
 .../flink/v2/kafka/KafkaSourceJsonToConsoleIT.java |  65 +----------
 .../kafka/KafkaSourceStartConfigToConsoleIT.java   |  98 +++++++++++++++++
 .../flink/v2/kafka/KafkaSourceTextToConsoleIT.java |  66 +----------
 .../e2e/flink/v2/kafka/KafkaTestBaseIT.java        |  90 +++++++++++++++
 .../kafka/kafkasource_earliest_to_console.conf}    |  89 +++++++--------
 .../kafkasource_group_offset_to_console.conf}      |  89 +++++++--------
 .../kafka/kafkasource_latest_to_console.conf}      |  89 +++++++--------
 .../kafkasource_specific_offsets_to_console.conf}  |  93 +++++++---------
 .../kafka/kafkasource_timestamp_to_console.conf}   |  90 +++++++--------
 .../e2e/spark/v2/kafka/KafkaContainer.java         |   2 +-
 .../spark/v2/kafka/KafkaSourceJsonToConsoleIT.java |  62 +----------
 .../kafka/KafkaSourceStartConfigToConsoleIT.java   |  98 +++++++++++++++++
 .../spark/v2/kafka/KafkaSourceTextToConsoleIT.java |  63 +----------
 .../e2e/spark/v2/kafka/KafkaTestBaseIT.java        |  90 +++++++++++++++
 ...e.conf => kafkasource_earliest_to_console.conf} |  85 ++++++---------
 ...nf => kafkasource_group_offset_to_console.conf} |  85 ++++++---------
 .../kafka/kafkasource_json_to_console.conf         |   4 +-
 ...ole.conf => kafkasource_latest_to_console.conf} |  85 ++++++---------
 ...> kafkasource_specific_offsets_to_console.conf} |  89 +++++++--------
 .../kafka/kafkasource_text_to_console.conf         |   4 +-
 ....conf => kafkasource_timestamp_to_console.conf} |  86 ++++++---------
 27 files changed, 1020 insertions(+), 797 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 425817ca5..966ebd972 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -18,7 +18,7 @@ Source connector for Apache Kafka.
 ## Options
 
 | name                 | type    | required | default value            |
-| -------------------- | ------- | -------- | ------------------------ |
+|----------------------|---------| -------- |--------------------------|
 | topic                | String  | yes      | -                        |
 | bootstrap.servers    | String  | yes      | -                        |
 | pattern              | Boolean | no       | false                    |
@@ -28,6 +28,9 @@ Source connector for Apache Kafka.
 | common-options       |         | no       | -                        |
 | schema               |         | no       | -                        |
 | format               | String  | no       | json                     |
+| start_mode           | String  | no       | group_offsets            |
+| start_mode.offsets   |         | no       |                          |
+| start_mode.timestamp | Long    | no       |                          |
 
 ### topic [string]
 
@@ -66,6 +69,24 @@ The structure of the data, including field names and field 
types.
 Data format. The default format is json. Optional text format. The default 
field separator is ", ".
 If you customize the delimiter, add the "field_delimiter" option.
 
+## start_mode
+The initial consumption pattern of consumers,there are several types:
+[earliest],[group_offsets],[latest],[specific_offsets],[timestamp]
+
+## start_mode.timestamp
+The time required for consumption mode to be timestamp
+
+##  start_mode.offsets
+The offset required for consumption mode to be specific_offsets
+for example:
+```hocon
+   start_mode.offsets = {
+            info-0 = 70
+            info-1 = 10
+            info-2 = 10
+         }
+```
+
 ## Example
 
 ###  Simple
@@ -84,7 +105,7 @@ source {
     format = text
     field_delimiter = "#“
     topic = "topic_1,topic_2,topic_3"
-    bootstrap.server = "localhost:9092"
+    bootstrap.servers = "localhost:9092"
     kafka.max.poll.records = 500
     kafka.client.id = client_1
   }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2dedcd13d..f9c28d546 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -90,4 +90,19 @@ public class Config {
      */
     public static final String PARTITION_KEY = "partition_key";
 
+    /**
+     * The initial consumption pattern of consumers
+     */
+    public static final String  START_MODE = "start_mode";
+
+    /**
+     * The time required for consumption mode to be timestamp
+     */
+    public static final String  START_MODE_TIMESTAMP  = "start_mode.timestamp";
+
+    /**
+     * The offset required for consumption mode to be specific_offsets
+     */
+    public static final String  START_MODE_OFFSETS  = "start_mode.offsets";
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
new file mode 100644
index 000000000..21ec4ffb2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum StartMode {
+
+    EARLIEST("earliest"),
+
+    GROUP_OFFSETS("group_offsets"),
+
+    LATEST("latest"),
+
+    TIMESTAMP("timestamp"),
+
+    SPECIFIC_OFFSETS("specific_offsets");
+
+    private String mode;
+
+    StartMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    @Override
+    public String toString() {
+        return mode;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index ee942aaef..615d89416 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -17,7 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+
+import org.apache.kafka.common.TopicPartition;
+
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -31,6 +36,9 @@ public class ConsumerMetadata implements Serializable {
     private Properties properties;
     private String consumerGroup;
     private boolean commitOnCheckpoint = false;
+    private StartMode startMode = StartMode.GROUP_OFFSETS;
+    private Map<TopicPartition, Long> specificStartOffsets;
+    private Long startOffsetsTimestamp;
 
     public boolean isCommitOnCheckpoint() {
         return commitOnCheckpoint;
@@ -79,4 +87,28 @@ public class ConsumerMetadata implements Serializable {
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
+
+    public StartMode getStartMode() {
+        return startMode;
+    }
+
+    public void setStartMode(StartMode startMode) {
+        this.startMode = startMode;
+    }
+
+    public Map<TopicPartition, Long> getSpecificStartOffsets() {
+        return specificStartOffsets;
+    }
+
+    public void setSpecificStartOffsets(Map<TopicPartition, Long> 
specificStartOffsets) {
+        this.specificStartOffsets = specificStartOffsets;
+    }
+
+    public Long getStartOffsetsTimestamp() {
+        return startOffsetsTimestamp;
+    }
+
+    public void setStartOffsetsTimestamp(Long startOffsetsTimestamp) {
+        this.startOffsetsTimestamp = startOffsetsTimestamp;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 3c608f4a5..07882ea49 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -26,6 +26,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIEL
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.JobContext;
@@ -42,15 +45,22 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.auto.service.AutoService;
+import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 @AutoService(SeaTunnelSource.class)
@@ -96,6 +106,40 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
             
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
         }
 
+        if (config.hasPath(START_MODE)) {
+            StartMode startMode = 
StartMode.valueOf(config.getString(START_MODE).toUpperCase());
+            this.metadata.setStartMode(startMode);
+            switch (startMode) {
+                case TIMESTAMP:
+                    long startOffsetsTimestamp = 
config.getLong(START_MODE_TIMESTAMP);
+                    long currentTimestamp = System.currentTimeMillis();
+                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
+                        throw new 
IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or 
smaller than the current time");
+                    }
+                    
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+                    break;
+                case SPECIFIC_OFFSETS:
+                    Config offsets = config.getConfig(START_MODE_OFFSETS);
+                    ConfigRenderOptions options = 
ConfigRenderOptions.concise();
+                    String offsetsJson = offsets.root().render(options);
+                    if (offsetsJson == null) {
+                        throw new IllegalArgumentException("start mode is " + 
StartMode.SPECIFIC_OFFSETS + "but no specific offsets were specified.");
+                    }
+                    Map<TopicPartition, Long> specificStartOffsets = new 
HashMap<>();
+                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
+                    jsonNodes.fieldNames().forEachRemaining(key -> {
+                        String[] topicAndPartition = key.split("-");
+                        long offset = jsonNodes.get(key).asLong();
+                        TopicPartition topicPartition = new 
TopicPartition(topicAndPartition[0], Integer.valueOf(topicAndPartition[1]));
+                        specificStartOffsets.put(topicPartition, offset);
+                    });
+                    
this.metadata.setSpecificStartOffsets(specificStartOffsets);
+                    break;
+                default:
+                    break;
+            }
+        }
+
         TypesafeConfigUtils.extractSubConfig(config, "kafka.", 
false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
         });
@@ -144,9 +188,9 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
                     delimiter = config.getString(FIELD_DELIMITER);
                 }
                 deserializationSchema = TextDeserializationSchema.builder()
-                        .seaTunnelRowType(typeInfo)
-                        .delimiter(delimiter)
-                        .build();
+                    .seaTunnelRowType(typeInfo)
+                    .delimiter(delimiter)
+                    .build();
             } else {
                 // TODO: use format SPI
                 throw new UnsupportedOperationException("Unsupported format: " 
+ format);
@@ -154,9 +198,9 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
         } else {
             typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
             this.deserializationSchema = TextDeserializationSchema.builder()
-                    .seaTunnelRowType(typeInfo)
-                    .delimiter(String.valueOf('\002'))
-                    .build();
+                .seaTunnelRowType(typeInfo)
+                .delimiter(String.valueOf('\002'))
+                .build();
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 0852fe8fd..59c1aba7f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -23,7 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -33,7 +33,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,14 +50,14 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
     private final Context<KafkaSourceSplit> context;
     private AdminClient adminClient;
 
-    private Set<KafkaSourceSplit> pendingSplit;
-    private final Set<KafkaSourceSplit> assignedSplit;
+    private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
+    private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context) {
         this.metadata = metadata;
         this.context = context;
-        this.assignedSplit = new HashSet<>();
-        this.pendingSplit = new HashSet<>();
+        this.assignedSplit = new HashMap<>();
+        this.pendingSplit = new HashMap<>();
     }
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context,
@@ -74,13 +73,45 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
     @Override
     public void run() throws ExecutionException, InterruptedException {
         getTopicInfo().forEach(split -> {
-            if (!assignedSplit.contains(split)) {
-                pendingSplit.add(split);
+            if (!assignedSplit.containsKey(split.getTopicPartition())) {
+                if (!pendingSplit.containsKey(split.getTopicPartition())) {
+                    pendingSplit.put(split.getTopicPartition(), split);
+                }
             }
         });
+        setPartitionStartOffset();
         assignSplit();
     }
 
+    private void setPartitionStartOffset() throws ExecutionException, 
InterruptedException {
+        Collection<TopicPartition> topicPartitions = pendingSplit.keySet();
+        Map<TopicPartition, Long> topicPartitionOffsets = null;
+        switch (metadata.getStartMode()) {
+            case EARLIEST:
+                topicPartitionOffsets = listOffsets(topicPartitions, 
OffsetSpec.earliest());
+                break;
+            case GROUP_OFFSETS:
+                topicPartitionOffsets = 
listConsumerGroupOffsets(topicPartitions);
+                break;
+            case LATEST:
+                topicPartitionOffsets = listOffsets(topicPartitions, 
OffsetSpec.latest());
+                break;
+            case TIMESTAMP:
+                topicPartitionOffsets = listOffsets(topicPartitions, 
OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp()));
+                break;
+            case SPECIFIC_OFFSETS:
+                topicPartitionOffsets = metadata.getSpecificStartOffsets();
+                break;
+            default:
+                break;
+        }
+        topicPartitionOffsets.entrySet().forEach(entry -> {
+            if (pendingSplit.containsKey(entry.getKey())) {
+                
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
+            }
+        });
+    }
+
     @Override
     public void close() throws IOException {
         if (this.adminClient != null) {
@@ -91,20 +122,20 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(convertToNextSplit(splits));
+            pendingSplit.putAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
-    private Collection<? extends KafkaSourceSplit> 
convertToNextSplit(List<KafkaSourceSplit> splits) {
+    private Map<TopicPartition, ? extends KafkaSourceSplit> 
convertToNextSplit(List<KafkaSourceSplit> splits) {
         try {
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
listOffsets =
-                
getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            Map<TopicPartition, Long> listOffsets =
+                
listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()),
 OffsetSpec.latest());
             splits.forEach(split -> {
                 split.setStartOffset(split.getEndOffset() + 1);
-                
split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset());
+                split.setEndOffset(listOffsets.get(split.getTopicPartition()));
             });
-            return splits;
+            return splits.stream().collect(Collectors.toMap(split -> 
split.getTopicPartition(), split -> split));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -129,7 +160,7 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
 
     @Override
     public KafkaSourceState snapshotState(long checkpointId) throws Exception {
-        return new KafkaSourceState(assignedSplit);
+        return new 
KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
     }
 
     @Override
@@ -158,34 +189,30 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         Collection<TopicPartition> partitions =
             
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> 
t.partitions().stream()
                 .map(p -> new TopicPartition(t.name(), 
p.partition()))).collect(Collectors.toSet());
-        return 
getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> {
-            KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey());
-            split.setEndOffset(partition.getValue().offset());
+        Map<TopicPartition, Long> latestOffsets = listOffsets(partitions, 
OffsetSpec.latest());
+        return partitions.stream().map(partition -> {
+            KafkaSourceSplit split = new KafkaSourceSplit(partition);
+            split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
             return split;
         }).collect(Collectors.toSet());
     }
 
-    private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
getKafkaPartitionLatestOffset(Collection<TopicPartition> partitions) throws 
InterruptedException, ExecutionException {
-        return 
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p 
-> OffsetSpec.latest())))
-            .all().get();
-    }
-
     private void assignSplit() {
         Map<Integer, List<KafkaSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
         for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
             readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
         }
 
-        pendingSplit.forEach(s -> {
-            if (!assignedSplit.contains(s)) {
-                readySplit.get(getSplitOwner(s.getTopicPartition(), 
context.currentParallelism()))
-                    .add(s);
+        pendingSplit.entrySet().forEach(s -> {
+            if (!assignedSplit.containsKey(s.getKey())) {
+                readySplit.get(getSplitOwner(s.getKey(), 
context.currentParallelism()))
+                    .add(s.getValue());
             }
         });
 
         readySplit.forEach(context::assignSplit);
 
-        assignedSplit.addAll(pendingSplit);
+        assignedSplit.putAll(pendingSplit);
         pendingSplit.clear();
     }
 
@@ -195,4 +222,42 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         return (startIndex + tp.partition()) % numReaders;
     }
 
+    private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition> 
partitions, OffsetSpec offsetSpec) throws ExecutionException, 
InterruptedException {
+        Map<TopicPartition, OffsetSpec> topicPartitionOffsets = 
partitions.stream().collect(Collectors.toMap(partition -> partition, __ -> 
offsetSpec));
+
+        return adminClient.listOffsets(topicPartitionOffsets).all()
+            .thenApply(
+                result -> {
+                    Map<TopicPartition, Long>
+                        offsets = new HashMap<>();
+                    result.forEach(
+                        (tp, offsetsResultInfo) -> {
+                            if (offsetsResultInfo != null) {
+                                offsets.put(tp, offsetsResultInfo.offset());
+                            }
+                        });
+                    return offsets;
+                })
+            .get();
+    }
+
+    public Map<TopicPartition, Long> 
listConsumerGroupOffsets(Collection<TopicPartition> partitions) throws 
ExecutionException, InterruptedException {
+        ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<>(partitions));
+        return adminClient
+            .listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
+            .partitionsToOffsetAndMetadata()
+            .thenApply(
+                result -> {
+                    Map<TopicPartition, Long> offsets = new HashMap<>();
+                    result.forEach(
+                        (tp, oam) -> {
+                            if (oam != null) {
+                                offsets.put(tp, oam.offset());
+                            }
+                        });
+                    return offsets;
+                })
+            .get();
+    }
+
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
index e3468e8dd..c1499b14e 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -27,69 +27,28 @@ import 
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 /**
  * This test case is used to verify that the kafka source is able to send data 
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
+ * Make sure the SeaTunnel job can submit successfully on flink engine.
  */
 @Slf4j
-public class KafkaSourceJsonToConsoleIT extends FlinkContainer {
-
-    private static final int KAFKA_PORT = 9093;
-
-    private static final String KAFKA_HOST = "kafkaCluster";
-
-    private KafkaProducer<byte[], byte[]> producer;
-
-    private KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases(KAFKA_HOST)
-                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
+public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
 
     @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
+    protected void generateTestData() {
 
         SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
                 new String[]{
@@ -160,22 +119,4 @@ public class KafkaSourceJsonToConsoleIT extends 
FlinkContainer {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
-    private void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
new file mode 100644
index 000000000..1d7225ae8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
+    @Override
+    protected void generateTestData() {
+        generateStepTestData(0, 100);
+    }
+
+    private void generateStepTestData(int start, int end) {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+            new String[]{
+                "id"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE
+            }
+        );
+
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+        for (int i = start; i < end; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                new Object[]{
+                    Long.valueOf(i)
+                });
+            ProducerRecord<byte[], byte[]> producerRecord = 
serializer.serializeRow(row);
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafka() throws IOException, InterruptedException {
+        testKafkaLatestToConsole();
+        testKafkaEarliestToConsole();
+        testKafkaSpecificOffsetsToConsole();
+        testKafkaGroupOffsetsToConsole();
+        testKafkaTimestampToConsole();
+    }
+
+    public void testKafkaLatestToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_latest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaEarliestToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_earliest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaSpecificOffsetsToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaGroupOffsetsToConsole() throws IOException, 
InterruptedException {
+        generateStepTestData(100, 150);
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_group_offset_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaTimestampToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_timestamp_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
index f0855b455..bfe37078d 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -26,70 +26,28 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 /**
  * This test case is used to verify that the kafka source is able to send data 
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
+ * Make sure the SeaTunnel job can submit successfully on flink engine.
  */
 @Slf4j
-public class KafkaSourceTextToConsoleIT extends FlinkContainer {
-
-    private static final int KAFKA_PORT = 9093;
-
-    private static final String KAFKA_HOST = "kafkaCluster";
-
-    private KafkaProducer<byte[], byte[]> producer;
-
-    private KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases(KAFKA_HOST)
-                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
-
+public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
     @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
+    protected void generateTestData() {
 
         SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
                 new String[]{
@@ -163,22 +121,4 @@ public class KafkaSourceTextToConsoleIT extends 
FlinkContainer {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
-    private void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
new file mode 100644
index 000000000..0c339cccc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaTestBaseIT extends FlinkContainer {
+    protected static final int KAFKA_PORT = 9093;
+
+    protected static final String KAFKA_HOST = "kafkaCluster";
+
+    protected KafkaProducer<byte[], byte[]> producer;
+
+    protected KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    protected void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    protected void generateTestData() {
+
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 51%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 91c920237..97f9ddb5d 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -19,38 +19,25 @@
 ######
 
 env {
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
-
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9093"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = earliest
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 51%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 91c920237..2a7f9828c 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -19,38 +19,25 @@
 ######
 
 env {
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
-
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9093"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = group_offsets
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 100
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 149
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 51%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 91c920237..736104ea0 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -19,38 +19,25 @@
 ######
 
 env {
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
-
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9093"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = latest
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 99
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 51%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 91c920237..b75756327 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -19,38 +19,29 @@
 ######
 
 env {
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
-
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9093"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = specific_offsets
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
+
+    start_mode.offsets = {
+                test_topic-0 = 50
+             }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +49,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 50
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 51%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 91c920237..b491c6546 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -19,38 +19,26 @@
 ######
 
 env {
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
-
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9093"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = timestamp
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
+     start_mode.timestamp = 1667179890315
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +46,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 149
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
index 431df9205..117daf78e 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
@@ -35,7 +35,7 @@ public class KafkaContainer extends 
GenericContainer<KafkaContainer> {
 
     private static final DockerImageName DEFAULT_IMAGE_NAME = 
DockerImageName.parse("confluentinc/cp-kafka");
 
-    public static final int KAFKA_PORT = 9093;
+    public static final int KAFKA_PORT = 9094;
 
     public static final int ZOOKEEPER_PORT = 2181;
 
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
index 8b9a425a9..de6abf829 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -27,68 +27,28 @@ import 
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 /**
  * This test case is used to verify that the kafka source is able to send data 
to the console.
  * Make sure the SeaTunnel job can submit successfully on spark engine.
  */
 @SuppressWarnings("checkstyle:EmptyLineSeparator")
 @Slf4j
-public class KafkaSourceJsonToConsoleIT extends SparkContainer {
-
-    private static final int KAFKA_PORT = 9093;
-
-    private static final String KAFKA_HOST = "kafkaCluster";
-
-    private KafkaProducer<byte[], byte[]> producer;
-
-    private KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases(KAFKA_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
+public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
 
     @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
+    protected void generateTestData() {
 
         SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
                 new String[]{
@@ -159,22 +119,4 @@ public class KafkaSourceJsonToConsoleIT extends 
SparkContainer {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
-    private void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
new file mode 100644
index 000000000..3ff1faff3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
+    @Override
+    protected void generateTestData() {
+        generateStepTestData(0, 100);
+    }
+
+    private void generateStepTestData(int start, int end) {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+            new String[]{
+                "id"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE
+            }
+        );
+
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+        for (int i = start; i < end; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                new Object[]{
+                    Long.valueOf(i)
+                });
+            ProducerRecord<byte[], byte[]> producerRecord = 
serializer.serializeRow(row);
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafka() throws IOException, InterruptedException {
+        testKafkaLatestToConsole();
+        testKafkaEarliestToConsole();
+        testKafkaSpecificOffsetsToConsole();
+        testKafkaGroupOffsetsToConsole();
+        testKafkaTimestampToConsole();
+    }
+
+    public void testKafkaLatestToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_latest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaEarliestToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_earliest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaSpecificOffsetsToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaGroupOffsetsToConsole() throws IOException, 
InterruptedException {
+        generateStepTestData(100, 150);
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_group_offset_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaTimestampToConsole() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_timestamp_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
index c2842557c..f4575f4e8 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -26,70 +26,29 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 /**
  * This test case is used to verify that the kafka source is able to send data 
to the console.
  * Make sure the SeaTunnel job can submit successfully on spark engine.
  */
 @Slf4j
-public class KafkaSourceTextToConsoleIT extends SparkContainer {
-
-    private static final int KAFKA_PORT = 9093;
-
-    private static final String KAFKA_HOST = "kafkaCluster";
-
-    private KafkaProducer<byte[], byte[]> producer;
-
-    private KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases(KAFKA_HOST)
-                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
+public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
 
     @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
+    protected void generateTestData() {
 
         SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
                 new String[]{
@@ -163,22 +122,4 @@ public class KafkaSourceTextToConsoleIT extends 
SparkContainer {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
-    private void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
new file mode 100644
index 000000000..b6d5354a9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaTestBaseIT extends SparkContainer {
+    protected static final int KAFKA_PORT = 9094;
+
+    protected static final String KAFKA_HOST = "kafkaCluster";
+
+    protected KafkaProducer<byte[], byte[]> producer;
+
+    protected KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    protected void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    protected void generateTestData() {
+
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 91c920237..c1674e192 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -20,37 +20,24 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
 
-
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = earliest
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 91c920237..336d364c0 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -20,37 +20,24 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
 
-
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = group_offsets
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 100
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 149
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
index 91c920237..cf5c67743 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -20,7 +20,7 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
@@ -28,7 +28,7 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 91c920237..0c20ebc9a 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -20,37 +20,24 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
 
-
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = latest
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 99
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 91c920237..8f6f00a68 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -20,37 +20,28 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
 
-
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = specific_offsets
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
+
+    start_mode.offsets = {
+                test_topic-0 = 50
+             }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +49,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 50
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
index 369f34a2f..94af38e64 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -20,7 +20,7 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
@@ -28,7 +28,7 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 53%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 91c920237..e7ec35c7d 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -20,37 +20,25 @@
 
 env {
     # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
+    job.app.name = "SeaTunnel"
     source.parallelism = 1
     job.mode = "BATCH"
 }
 
-
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
+    bootstrap.servers = "kafkaCluster:9094"
     topic = "test_topic"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    # The default format is json, which is optional
+    format = json
+    start_mode = timestamp
     schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
+       fields {
+            id = bigint
+       }
+     }
+     start_mode.timestamp = 1667179890315
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
@@ -58,33 +46,31 @@ source {
 }
 
 transform {
-}
+ }
 
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
+sink  {
+       Console {}
+
+         Assert {
+                  rules =
+                    {
+                      field_rules = [
+                          {
+                         field_name = id
+                         field_type = long
+                         field_value = [
 
-   }
-}
\ No newline at end of file
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 149
+                             }
+                         ]
+                      }
+                      ]
+                    }
+                  }
+      }
\ No newline at end of file

Reply via email to