This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f65f44f44 [Improve][Connector-V2][Kafka] Support to specify multiple 
partition keys (#3230)
f65f44f44 is described below

commit f65f44f44c4ca8bc43760b8be0666412f3284cf8
Author: Harvey Yue <[email protected]>
AuthorDate: Thu Nov 17 15:22:59 2022 +0800

    [Improve][Connector-V2][Kafka] Support to specify multiple partition keys 
(#3230)
---
 docs/en/connector-v2/sink/Kafka.md                 |  35 +--
 .../connectors/seatunnel/kafka/config/Config.java  |   6 +-
 .../serialize/DefaultSeaTunnelRowSerializer.java   |  68 +++--
 .../kafka/serialize/SeaTunnelRowSerializer.java    |   9 -
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     |   2 +-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  78 ++----
 .../connector-kafka-e2e}/pom.xml                   |  17 +-
 .../e2e/connector}/kafka/KafkaContainer.java       |   2 +-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 295 +++++++++++++++++++++
 .../kafka/kafkasource_earliest_to_console.conf     |   5 +-
 .../kafka/kafkasource_group_offset_to_console.conf |   4 +-
 .../kafka/kafkasource_latest_to_console.conf       |   4 +-
 .../kafkasource_specific_offsets_to_console.conf   |   6 +-
 .../kafka/kafkasource_timestamp_to_console.conf    |   6 +-
 .../test/resources/kafkasink_fake_to_kafka.conf    |  68 +++++
 .../resources}/kafkasource_json_to_console.conf    |  16 +-
 .../resources}/kafkasource_text_to_console.conf    |  16 +-
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../e2e/flink/v2/kafka/KafkaContainer.java         | 151 -----------
 .../kafka/KafkaSourceStartConfigToConsoleIT.java   |  98 -------
 .../e2e/flink/v2/kafka/KafkaTestBaseIT.java        |  90 -------
 .../kafka/kafkasource_json_to_console.conf         |  91 -------
 .../kafka/kafkasource_text_to_console.conf         |  92 -------
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 -
 .../connector-kafka-spark-e2e/pom.xml              |  53 ----
 .../spark/v2/kafka/KafkaSourceJsonToConsoleIT.java | 122 ---------
 .../kafka/KafkaSourceStartConfigToConsoleIT.java   |  98 -------
 .../spark/v2/kafka/KafkaSourceTextToConsoleIT.java | 125 ---------
 .../e2e/spark/v2/kafka/KafkaTestBaseIT.java        |  90 -------
 .../kafka/kafkasource_earliest_to_console.conf     |  75 ------
 .../kafka/kafkasource_group_offset_to_console.conf |  75 ------
 .../kafka/kafkasource_latest_to_console.conf       |  75 ------
 .../kafkasource_specific_offsets_to_console.conf   |  79 ------
 .../kafka/kafkasource_timestamp_to_console.conf    |  76 ------
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 -
 35 files changed, 508 insertions(+), 1522 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index 74de8a57a..d480d5ab5 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -15,17 +15,17 @@ By default, we will use 2pc to guarantee the message is 
sent to kafka exactly on
 
 ## Options
 
-| name               | type                   | required | default value |
-| ------------------ | ---------------------- | -------- | ------------- |
-| topic              | string                 | yes      | -             |
-| bootstrap.servers  | string                 | yes      | -             |
-| kafka.*            | kafka producer config  | no       | -             |
-| semantic           | string                 | no       | NON           |
-| partition_key      | string                 | no       | -             |
-| partition          | int                    | no       | -             |
-| assign_partitions  | list                   | no       | -             |
-| transaction_prefix | string                 | no       | -             |
-| common-options     | config                 | no       | -             |
+| name                 | type                  | required | default value |
+|----------------------|-----------------------| -------- | ------------- |
+| topic                | string                | yes      | -             |
+| bootstrap.servers    | string                | yes      | -             |
+| kafka.*              | kafka producer config | no       | -             |
+| semantic             | string                | no       | NON           |
+| partition_key_fields | array                 | no       | -             |
+| partition            | int                   | no       | -             |
+| assign_partitions    | array                 | no       | -             |
+| transaction_prefix   | string                | no       | -             |
+| common-options       | config                | no       | -             |
 
 ### topic [string]
 
@@ -51,11 +51,11 @@ In AT_LEAST_ONCE, producer will wait for all outstanding 
messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on 
the Kafka broker and messages may be duplicated.
 
-### partition_key [string]
+### partition_key_fields [array]
 
-Configure which field is used as the key of the kafka message.
+Configure which fields are used as the key of the kafka message.
 
-For example, if you want to use value of a field from upstream data as key, 
you can assign it to the field name.
+For example, if you want to use value of fields from upstream data as key, you 
can assign field names to this property.
 
 Upstream data is the following:
 
@@ -66,13 +66,13 @@ Upstream data is the following:
 
 If name is set as the key, then the hash value of the name column will 
determine which partition the message is sent to.
 
-If the field name does not exist in the upstream data, the configured 
parameter will be used as the key.
+If not set partition key fields, the null message key will be sent to.
 
 ### partition [int]
 
 We can specify the partition, all messages will be sent to this partition.
 
-### assign_partitions [list]
+### assign_partitions [array]
 
 We can decide which partition to send based on the content of the message. The 
function of this parameter is to distribute information.
 
@@ -113,3 +113,6 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - Add Kafka Sink Connector
+### next version
+
+- [Feature] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 8cf6726fe..6030bfa39 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -101,10 +101,10 @@ public class Config {
             .withDescription("We can decide which partition to send based on 
the content of the message. " +
                     "The function of this parameter is to distribute 
information.");
 
-    public static final Option<String> PARTITION_KEY = 
Options.key("partition_key")
-            .stringType()
+    public static final Option<List<String>> PARTITION_KEY_FIELDS = 
Options.key("partition_key_fields")
+            .listType()
             .noDefaultValue()
-            .withDescription("Configure which field is used as the key of the 
kafka message.");
+            .withDescription("Configure which fields are used as the key of 
the kafka message.");
 
     public static final Option<StartMode> START_MODE = 
Options.key("start_mode")
             .objectType(StartMode.class)
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 24a2b242f..b7c43acbb 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -17,42 +17,78 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
 
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.util.List;
+import java.util.function.Function;
+
 public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<byte[], byte[]> {
 
-    private int partation = -1;
+    private Integer partition;
     private final String topic;
-    private final JsonSerializationSchema jsonSerializationSchema;
+    private final SerializationSchema keySerialization;
+    private final SerializationSchema valueSerialization;
 
     public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType 
seaTunnelRowType) {
-        this.topic = topic;
-        this.jsonSerializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+        this(topic, element -> null, 
createSerializationSchema(seaTunnelRowType));
     }
 
-    public DefaultSeaTunnelRowSerializer(String topic, int partation, 
SeaTunnelRowType seaTunnelRowType) {
+    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, 
SeaTunnelRowType seaTunnelRowType) {
         this(topic, seaTunnelRowType);
-        this.partation = partation;
+        this.partition = partition;
+    }
+
+    public DefaultSeaTunnelRowSerializer(String topic,
+                                         List<String> keyFieldNames,
+                                         SeaTunnelRowType seaTunnelRowType) {
+        this(topic, createKeySerializationSchema(keyFieldNames, 
seaTunnelRowType),
+                createSerializationSchema(seaTunnelRowType));
+    }
+
+    public DefaultSeaTunnelRowSerializer(String topic,
+                                         SerializationSchema keySerialization,
+                                         SerializationSchema 
valueSerialization) {
+        this.topic = topic;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
     }
 
     @Override
     public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
-        if (this.partation != -1) {
-            return new ProducerRecord<>(topic, this.partation, null, 
jsonSerializationSchema.serialize(row));
-        }
-        else {
-            return new ProducerRecord<>(topic, null, 
jsonSerializationSchema.serialize(row));
-        }
+        return new ProducerRecord<>(topic, partition,
+                keySerialization.serialize(row), 
valueSerialization.serialize(row));
     }
 
-    @Override
-    public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, 
SeaTunnelRow row) {
-        //if the key is null, kafka will send message to a random partition
-        return new ProducerRecord<>(topic, key == null ? null : 
key.getBytes(), jsonSerializationSchema.serialize(row));
+    private static SerializationSchema 
createSerializationSchema(SeaTunnelRowType rowType) {
+        return new JsonSerializationSchema(rowType);
     }
 
+    private static SerializationSchema 
createKeySerializationSchema(List<String> keyFieldNames,
+                                                                    
SeaTunnelRowType seaTunnelRowType) {
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new 
SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = 
seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType = new 
SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = 
createSerializationSchema(keyType);
+
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
+            Object[] keyFields = new Object[keyFieldIndexArr.length];
+            for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                keyFields[i] = row.getField(keyFieldIndexArr[i]);
+            }
+            return new SeaTunnelRow(keyFields);
+        };
+        return row -> 
keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index d96753155..9f12591ea 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -30,13 +30,4 @@ public interface SeaTunnelRowSerializer<K, V> {
      * @return kafka record.
      */
     ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
-
-    /**
-     * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link 
ProducerRecord}.
-     *
-     * @param key String
-     * @param row seatunnel row
-     * @return kafka record.
-     */
-    ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fb9b1e5c4..fef40310e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -36,7 +36,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
                 .optional(Config.KAFKA_CONFIG_PREFIX, 
Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
-                .exclusive(Config.PARTITION, Config.PARTITION_KEY)
+                .exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 8a7598fc1..dc143ec60 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -18,8 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
 
@@ -40,11 +41,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
-import java.util.function.Function;
 
 /**
  * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Kafka.
@@ -52,44 +53,21 @@ import java.util.function.Function;
 public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, 
KafkaCommitInfo, KafkaSinkState> {
 
     private final SinkWriter.Context context;
-    private final Config pluginConfig;
-    private final Function<SeaTunnelRow, String> partitionExtractor;
 
     private String transactionPrefix;
     private long lastCheckpointId = 0;
-    private int partition;
 
     private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
     private final SeaTunnelRowSerializer<byte[], byte[]> 
seaTunnelRowSerializer;
 
     private static final int PREFIX_RANGE = 10000;
 
-    // check config
-    @Override
-    public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = null;
-        //Determine the partition of the kafka send message based on the field 
name
-        if (pluginConfig.hasPath(PARTITION_KEY.key())){
-            String key = partitionExtractor.apply(element);
-            producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, 
element);
-        }
-        else {
-            producerRecord = seaTunnelRowSerializer.serializeRow(element);
-        }
-        kafkaProducerSender.send(producerRecord);
-    }
-
     public KafkaSinkWriter(
             SinkWriter.Context context,
             SeaTunnelRowType seaTunnelRowType,
             Config pluginConfig,
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
-        this.pluginConfig = pluginConfig;
-        this.partitionExtractor = createPartitionExtractor(pluginConfig, 
seaTunnelRowType);
-        if (pluginConfig.hasPath(PARTITION.key())) {
-            this.partition = pluginConfig.getInt(PARTITION.key());
-        }
         if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
             
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
         }
@@ -116,6 +94,12 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
     }
 
+    @Override
+    public void write(SeaTunnelRow element) {
+        ProducerRecord<byte[], byte[]> producerRecord = 
seaTunnelRowSerializer.serializeRow(element);
+        kafkaProducerSender.send(producerRecord);
+    }
+
     @Override
     public List<KafkaSinkState> snapshotState(long checkpointId) {
         List<KafkaSinkState> states = 
kafkaProducerSender.snapshotState(checkpointId);
@@ -145,8 +129,7 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     private Properties getKafkaProperties(Config pluginConfig) {
-        Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
-                
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX.key(),
 false);
+        Config kafkaConfig = 
TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(), 
false);
         Properties kafkaProperties = new Properties();
         kafkaConfig.entrySet().forEach(entry -> {
             kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
@@ -160,13 +143,13 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         return kafkaProperties;
     }
 
-    // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config 
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.hasPath(PARTITION.key())){
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), 
this.partition, seaTunnelRowType);
-        }
-        else {
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), 
seaTunnelRowType);
+        if (pluginConfig.hasPath(PARTITION.key())) {
+            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+                    pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
+        } else {
+            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+                    getPartitionKeyFields(pluginConfig, seaTunnelRowType), 
seaTunnelRowType);
         }
     }
 
@@ -188,23 +171,18 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config 
pluginConfig,
-                                                                    
SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY.key())){
-            return row -> null;
-        }
-        String partitionKey = pluginConfig.getString(PARTITION_KEY.key());
-        List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
-        if (!fieldNames.contains(partitionKey)) {
-            return row -> partitionKey;
-        }
-        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
-        return row -> {
-            Object partitionFieldValue = row.getField(partitionFieldIndex);
-            if (partitionFieldValue != null) {
-                return partitionFieldValue.toString();
+    private List<String> getPartitionKeyFields(Config pluginConfig, 
SeaTunnelRowType seaTunnelRowType) {
+        if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
+            List<String> partitionKeyFields = 
pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
+            List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+            for (String partitionKeyField : partitionKeyFields) {
+                if (!rowTypeFieldNames.contains(partitionKeyField)) {
+                    throw new IllegalArgumentException(String.format(
+                            "Partition key field not found: %s, rowType: %s", 
partitionKeyField, rowTypeFieldNames));
+                }
             }
-            return null;
-        };
+            return partitionKeyFields;
+        }
+        return Collections.emptyList();
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
 b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
similarity index 88%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index 2eccfa9c5..d2ee1f526 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -18,39 +18,36 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-kafka-flink-e2e</artifactId>
+    <artifactId>connector-kafka-e2e</artifactId>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-flink-e2e-base</artifactId>
+            <artifactId>connector-kafka</artifactId>
             <version>${project.version}</version>
-            <classifier>tests</classifier>
-            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-
-        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-kafka</artifactId>
+            <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-assert</artifactId>
+            <artifactId>connector-fake</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
similarity index 99%
rename from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
index 117daf78e..c3fa2cb3b 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.spark.v2.kafka;
+package org.apache.seatunnel.e2e.connector.kafka;
 
 import com.github.dockerjava.api.command.InspectContainerResponse;
 import lombok.SneakyThrows;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
new file mode 100644
index 000000000..6d907cd09
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaIT extends TestSuiteBase implements TestResource {
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:6.2.1";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        kafkaContainer = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initKafkaProducer());
+
+        log.info("Write 100 records to topic test_topic_source");
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        String topicName = "test_topic";
+        Map<String, String> data = new HashMap<>();
+        ObjectMapper objectMapper = new ObjectMapper();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets = 
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, String> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+
+                        data.put(record.key(), record.value());
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        String key = data.keySet().iterator().next();
+        ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+        Assertions.assertTrue(objectNode.has("c_map"));
+        Assertions.assertTrue(objectNode.has("c_string"));
+        Assertions.assertEquals(10, data.size());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaTextToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        TextSerializationSchema serializer = TextSerializationSchema.builder()
+                .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                .delimiter(",")
+                .build();
+        generateTestData(row -> new ProducerRecord<>("test_topic_text", null, 
serializer.serialize(row)), 0, 100);
+        Container.ExecResult execResult = 
container.executeJob("/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaJsonToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult = 
container.executeJob("/kafkasource_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceKafka(TestContainer container) throws IOException, 
InterruptedException {
+        testKafkaLatestToConsole(container);
+        testKafkaEarliestToConsole(container);
+        testKafkaSpecificOffsetsToConsole(container);
+        testKafkaTimestampToConsole(container);
+    }
+
+    @TestTemplate
+    public void testSourceKafkaStartConfig(TestContainer container) throws 
IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE);
+        generateTestData(row -> serializer.serializeRow(row), 100, 150);
+        testKafkaGroupOffsetsToConsole(container);
+    }
+
+    public void testKafkaLatestToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka/kafkasource_latest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaEarliestToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka/kafkasource_earliest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaSpecificOffsetsToConsole(TestContainer container) 
throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaGroupOffsetsToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka/kafkasource_group_offset_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    public void testKafkaTimestampToConsole(TestContainer container) throws 
IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafka/kafkasource_timestamp_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    private void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    private Properties kafkaConsumerConfig() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"seatunnel-kafka-sink-group");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        return props;
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData(ProducerRecordConverter converter, int 
start, int end) {
+        for (int i = start; i < end; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                            Long.valueOf(i),
+                            Collections.singletonMap("key", 
Short.parseShort("1")),
+                            new Byte[]{Byte.parseByte("1")},
+                            "string",
+                            Boolean.FALSE,
+                            Byte.parseByte("1"),
+                            Short.parseShort("1"),
+                            Integer.parseInt("1"),
+                            Long.parseLong("1"),
+                            Float.parseFloat("1.1"),
+                            Double.parseDouble("1.1"),
+                            BigDecimal.valueOf(11, 1),
+                            "test".getBytes(),
+                            LocalDate.now(),
+                            LocalDateTime.now()
+                    });
+            ProducerRecord<byte[], byte[]> producerRecord = 
converter.convert(row);
+            producer.send(producerRecord);
+        }
+    }
+
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = new 
SeaTunnelRowType(
+            new String[]{
+                "id",
+                "c_map",
+                "c_array",
+                "c_string",
+                "c_boolean",
+                "c_tinyint",
+                "c_smallint",
+                "c_int",
+                "c_bigint",
+                "c_float",
+                "c_double",
+                "c_decimal",
+                "c_bytes",
+                "c_date",
+                "c_timestamp"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE,
+                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                ArrayType.BYTE_ARRAY_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.BOOLEAN_TYPE,
+                BasicType.BYTE_TYPE,
+                BasicType.SHORT_TYPE,
+                BasicType.INT_TYPE,
+                BasicType.LONG_TYPE,
+                BasicType.FLOAT_TYPE,
+                BasicType.DOUBLE_TYPE,
+                new DecimalType(2, 1),
+                PrimitiveByteArrayType.INSTANCE,
+                LocalTimeType.LOCAL_DATE_TYPE,
+                LocalTimeType.LOCAL_DATE_TIME_TYPE
+            }
+    );
+
+    interface ProducerRecordConverter {
+        ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 95%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 97f9ddb5d..8e52fedfc 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -20,6 +20,7 @@
 
 env {
   # You can set flink configuration here
+  job.mode = "BATCH"
   execution.parallelism = 1
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -27,8 +28,8 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_source"
     result_table_name = "kafka_table"
     # The default format is json, which is optional
     format = json
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 96%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 2a7f9828c..d3ce8e6b3 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_group"
     result_table_name = "kafka_table"
     # The default format is json, which is optional
     format = json
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 96%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 736104ea0..cfc992e5b 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_source"
     result_table_name = "kafka_table"
     # The default format is json, which is optional
     format = json
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 94%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index b75756327..512119851 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_source"
     result_table_name = "kafka_table"
     # The default format is json, which is optional
     format = json
@@ -40,7 +40,7 @@ source {
      }
 
     start_mode.offsets = {
-                test_topic-0 = 50
+                test_topic_source-0 = 50
              }
   }
 
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 94%
rename from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index b491c6546..431205f3c 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_source"
     result_table_name = "kafka_table"
     # The default format is json, which is optional
     format = json
@@ -66,7 +66,7 @@ sink  {
                              },
                              {
                                  rule_type = MAX
-                                 rule_value = 149
+                                 rule_value = 99
                              }
                          ]
                       }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
new file mode 100644
index 000000000..086136bf5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key_fields = ["c_map","c_string"]
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
similarity index 89%
rename from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
index cf5c67743..129149623 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
@@ -19,17 +19,21 @@
 ######
 
 env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
+    execution.parallelism = 1
     job.mode = "BATCH"
-}
 
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_json"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
     schema = {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
similarity index 90%
rename from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
index 94af38e64..67879791b 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
@@ -19,17 +19,21 @@
 ######
 
 env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
+    execution.parallelism = 1
     job.mode = "BATCH"
-}
 
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
 
 source {
   Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic_text"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
     schema = {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index eb8614495..6d3b7703c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-file-local-e2e</module>
         <module>connector-cassandra-e2e</module>
         <module>connector-http-e2e</module>
+        <module>connector-kafka-e2e</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
deleted file mode 100644
index 7d7fe1920..000000000
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import com.github.dockerjava.api.command.InspectContainerResponse;
-import lombok.SneakyThrows;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-
-/**
- * This container wraps Confluent Kafka and Zookeeper (optionally)
- */
-public class KafkaContainer extends GenericContainer<KafkaContainer> {
-
-    private static final DockerImageName DEFAULT_IMAGE_NAME = 
DockerImageName.parse("confluentinc/cp-kafka");
-
-    public static final int KAFKA_PORT = 9093;
-
-    public static final int ZOOKEEPER_PORT = 2181;
-
-    private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
-
-    protected String externalZookeeperConnect = null;
-
-    public KafkaContainer(final DockerImageName dockerImageName) {
-        super(dockerImageName);
-        dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
-
-        withExposedPorts(KAFKA_PORT);
-
-        // Use two listeners with different names, it will force Kafka to 
communicate with itself via internal
-        // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise 
Kafka will try to use the advertised listener
-        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + 
",BROKER://0.0.0.0:9092");
-        withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
-        withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
-
-        withEnv("KAFKA_BROKER_ID", "1");
-        withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", 
DEFAULT_INTERNAL_TOPIC_RF);
-        withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", 
DEFAULT_INTERNAL_TOPIC_RF);
-        withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", 
DEFAULT_INTERNAL_TOPIC_RF);
-        withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", 
DEFAULT_INTERNAL_TOPIC_RF);
-        withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
-        withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
-    }
-
-    public KafkaContainer withEmbeddedZookeeper() {
-        externalZookeeperConnect = null;
-        return self();
-    }
-
-    public KafkaContainer withExternalZookeeper(String connectString) {
-        externalZookeeperConnect = connectString;
-        return self();
-    }
-
-    public String getBootstrapServers() {
-        return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), 
getMappedPort(KAFKA_PORT));
-    }
-
-    @Override
-    protected void configure() {
-        withEnv(
-                "KAFKA_ADVERTISED_LISTENERS",
-                String.format("BROKER://%s:9092", getNetwork() != null ? 
getNetworkAliases().get(1) : "localhost")
-        );
-
-        String command = "#!/bin/bash\n";
-        if (externalZookeeperConnect != null) {
-            withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
-        } else {
-            addExposedPort(ZOOKEEPER_PORT);
-            withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
-            command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > 
zookeeper.properties\n";
-            command += "echo 'dataDir=/var/lib/zookeeper/data' >> 
zookeeper.properties\n";
-            command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> 
zookeeper.properties\n";
-            command += "zookeeper-server-start zookeeper.properties &\n";
-        }
-
-        // Optimization: skip the checks
-        command += "echo '' > /etc/confluent/docker/ensure \n";
-        // Run the original command
-        command += "/etc/confluent/docker/run \n";
-        withCommand("sh", "-c", command);
-    }
-
-    @Override
-    @SneakyThrows
-    protected void containerIsStarted(InspectContainerResponse containerInfo) {
-        String brokerAdvertisedListener = 
brokerAdvertisedListener(containerInfo);
-        ExecResult result = execInContainer(
-                "kafka-configs",
-                "--alter",
-                "--bootstrap-server",
-                brokerAdvertisedListener,
-                "--entity-type",
-                "brokers",
-                "--entity-name",
-                getEnvMap().get("KAFKA_BROKER_ID"),
-                "--add-config",
-                "advertised.listeners=[" + String.join(",", 
getBootstrapServers(), brokerAdvertisedListener) + "]"
-        );
-        if (result.getExitCode() != 0) {
-            throw new IllegalStateException(result.toString());
-        }
-    }
-
-    protected String brokerAdvertisedListener(InspectContainerResponse 
containerInfo) {
-        return String.format("BROKER://%s:%s", 
containerInfo.getConfig().getHostName(), "9092");
-    }
-
-    public String getLinuxLocalIp() {
-        String ip = "";
-        try {
-            Enumeration<NetworkInterface> networkInterfaces = 
NetworkInterface.getNetworkInterfaces();
-            while (networkInterfaces.hasMoreElements()) {
-                NetworkInterface networkInterface = 
networkInterfaces.nextElement();
-                Enumeration<InetAddress> inetAddresses = 
networkInterface.getInetAddresses();
-                while (inetAddresses.hasMoreElements()) {
-                    InetAddress inetAddress = inetAddresses.nextElement();
-                    if (!inetAddress.isLoopbackAddress() && inetAddress 
instanceof Inet4Address) {
-                        ip =  inetAddress.getHostAddress();
-                    }
-                }
-            }
-        } catch (SocketException ex) {
-            ex.printStackTrace();
-        }
-        return ip;
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
deleted file mode 100644
index 1d7225ae8..000000000
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-
-@Slf4j
-public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
-    @Override
-    protected void generateTestData() {
-        generateStepTestData(0, 100);
-    }
-
-    private void generateStepTestData(int start, int end) {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-            new String[]{
-                "id"
-            },
-            new SeaTunnelDataType[]{
-                BasicType.LONG_TYPE
-            }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
-        for (int i = start; i < end; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                new Object[]{
-                    Long.valueOf(i)
-                });
-            ProducerRecord<byte[], byte[]> producerRecord = 
serializer.serializeRow(row);
-            producer.send(producerRecord);
-        }
-    }
-
-    @Test
-    public void testKafka() throws IOException, InterruptedException {
-        testKafkaLatestToConsole();
-        testKafkaEarliestToConsole();
-        testKafkaSpecificOffsetsToConsole();
-        testKafkaGroupOffsetsToConsole();
-        testKafkaTimestampToConsole();
-    }
-
-    public void testKafkaLatestToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_latest_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaEarliestToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_earliest_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaSpecificOffsetsToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaGroupOffsetsToConsole() throws IOException, 
InterruptedException {
-        generateStepTestData(100, 150);
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_group_offset_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaTimestampToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/kafka/kafkasource_timestamp_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
deleted file mode 100644
index 0c339cccc..000000000
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class KafkaTestBaseIT extends FlinkContainer {
-    protected static final int KAFKA_PORT = 9093;
-
-    protected static final String KAFKA_HOST = "kafkaCluster";
-
-    protected KafkaProducer<byte[], byte[]> producer;
-
-    protected KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-            .withNetwork(NETWORK)
-            .withNetworkAliases(KAFKA_HOST)
-            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
-
-    protected void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @SuppressWarnings("checkstyle:Indentation")
-    protected void generateTestData() {
-
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
deleted file mode 100644
index 62a1dc967..000000000
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
-    schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
-    # The default format is json, which is optional
-    format = json
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
-}
-
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-                {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-          }
-          ]
-       }
-
-   }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
deleted file mode 100644
index c1b3c0d47..000000000
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9093"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
-    schema = {
-      fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
-      }
-    }
-    format = text
-    # The default field delimiter is ","
-    field_delimiter = ","
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
-}
-
-sink {
-  Console {}
-  Assert {
-     rules =
-       {
-         field_rules = [
-               {
-            field_name = id
-            field_type = long
-            field_value = [
-                {
-                    rule_type = NOT_NULL
-                },
-                {
-                    rule_type = MIN
-                    rule_value = 0
-                },
-                {
-                    rule_type = MAX
-                    rule_value = 99
-                }
-            ]
-         }
-         ]
-       }
-
-   }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 6d356eb54..adc3f3610 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -36,7 +36,6 @@
         <module>connector-mongodb-flink-e2e</module>
         <module>connector-iceberg-flink-e2e</module>
         <module>connector-neo4j-flink-e2e</module>
-        <module>connector-kafka-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
deleted file mode 100644
index 5f92a2599..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-       http://www.apache.org/licenses/LICENSE-2.0
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>connector-kafka-spark-e2e</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-spark-e2e-base</artifactId>
-            <version>${project.version}</version>
-            <classifier>tests</classifier>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- SeaTunnel connectors -->
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-kafka</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-</project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
deleted file mode 100644
index de6abf829..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.Collections;
-/**
- * This test case is used to verify that the kafka source is able to send data 
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
- */
-@SuppressWarnings("checkstyle:EmptyLineSeparator")
-@Slf4j
-public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
-
-    @SuppressWarnings("checkstyle:Indentation")
-    protected void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, 
BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
-
-        for (int i = 0; i < 100; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                    new Object[]{
-                            Long.valueOf(i),
-                            Collections.singletonMap("key", 
Short.parseShort("1")),
-                            new Byte[]{Byte.parseByte("1")},
-                            "string",
-                            Boolean.FALSE,
-                            Byte.parseByte("1"),
-                            Short.parseShort("1"),
-                            Integer.parseInt("1"),
-                            Long.parseLong("1"),
-                            Float.parseFloat("1.1"),
-                            Double.parseDouble("1.1"),
-                            BigDecimal.valueOf(11, 1),
-                            "test".getBytes(),
-                            LocalDate.now(),
-                            LocalDateTime.now()
-                    });
-            ProducerRecord<byte[], byte[]> producerRecord = 
serializer.serializeRow(row);
-            producer.send(producerRecord);
-        }
-    }
-
-    @Test
-    public void testKafkaSource() throws IOException, InterruptedException {
-        Container.ExecResult execResult =  
executeSeaTunnelSparkJob("/kafka/kafkasource_json_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
deleted file mode 100644
index 3ff1faff3..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-
-@Slf4j
-public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
-    @Override
-    protected void generateTestData() {
-        generateStepTestData(0, 100);
-    }
-
-    private void generateStepTestData(int start, int end) {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-            new String[]{
-                "id"
-            },
-            new SeaTunnelDataType[]{
-                BasicType.LONG_TYPE
-            }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new 
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
-        for (int i = start; i < end; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                new Object[]{
-                    Long.valueOf(i)
-                });
-            ProducerRecord<byte[], byte[]> producerRecord = 
serializer.serializeRow(row);
-            producer.send(producerRecord);
-        }
-    }
-
-    @Test
-    public void testKafka() throws IOException, InterruptedException {
-        testKafkaLatestToConsole();
-        testKafkaEarliestToConsole();
-        testKafkaSpecificOffsetsToConsole();
-        testKafkaGroupOffsetsToConsole();
-        testKafkaTimestampToConsole();
-    }
-
-    public void testKafkaLatestToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_latest_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaEarliestToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_earliest_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaSpecificOffsetsToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaGroupOffsetsToConsole() throws IOException, 
InterruptedException {
-        generateStepTestData(100, 150);
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_group_offset_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-    public void testKafkaTimestampToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_timestamp_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
deleted file mode 100644
index f4575f4e8..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.format.text.TextSerializationSchema;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.Collections;
-
-/**
- * This test case is used to verify that the kafka source is able to send data 
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
- */
-@Slf4j
-public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
-
-    @SuppressWarnings("checkstyle:Indentation")
-    protected void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, 
BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                });
-
-        TextSerializationSchema serializationSchema = 
TextSerializationSchema.builder()
-                .seaTunnelRowType(seatunnelRowType)
-                .delimiter(",")
-                .build();
-
-        for (int i = 0; i < 100; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                    new Object[]{
-                            Long.valueOf(i),
-                            Collections.singletonMap("key", 
Short.parseShort("1")),
-                            new Byte[]{Byte.parseByte("1")},
-                            "string",
-                            Boolean.FALSE,
-                            Byte.parseByte("1"),
-                            Short.parseShort("1"),
-                            Integer.parseInt("1"),
-                            Long.parseLong("1"),
-                            Float.parseFloat("1.1"),
-                            Double.parseDouble("1.1"),
-                            BigDecimal.valueOf(11, 1),
-                            "test".getBytes(),
-                            LocalDate.now(),
-                            LocalDateTime.now()
-                    });
-
-            ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
-            producer.send(producerRecord);
-        }
-    }
-
-    @Test
-    public void testKafkaSourceTextToConsole() throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/kafka/kafkasource_text_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-    }
-
-}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
deleted file mode 100644
index b6d5354a9..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class KafkaTestBaseIT extends SparkContainer {
-    protected static final int KAFKA_PORT = 9094;
-
-    protected static final String KAFKA_HOST = "kafkaCluster";
-
-    protected KafkaProducer<byte[], byte[]> producer;
-
-    protected KafkaContainer kafkaContainer;
-
-    @BeforeEach
-    public void startKafkaContainer() {
-        kafkaContainer = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
-            .withNetwork(NETWORK)
-            .withNetworkAliases(KAFKA_HOST)
-            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
-        kafkaContainer.setPortBindings(Lists.newArrayList(
-            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
-        Startables.deepStart(Stream.of(kafkaContainer)).join();
-        log.info("Kafka container started");
-        Awaitility.given().ignoreExceptions()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
-    }
-
-    protected void initKafkaProducer() {
-        Properties props = new Properties();
-        String bootstrapServers = kafkaContainer.getBootstrapServers();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        producer = new KafkaProducer<>(props);
-    }
-
-    @SuppressWarnings("checkstyle:Indentation")
-    protected void generateTestData() {
-
-    }
-
-    @AfterEach
-    public void close() {
-        if (producer != null) {
-            producer.close();
-        }
-        if (kafkaContainer != null) {
-            kafkaContainer.close();
-        }
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
deleted file mode 100644
index c1674e192..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    # The default format is json, which is optional
-    format = json
-    start_mode = earliest
-    schema = {
-       fields {
-            id = bigint
-       }
-     }
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink  {
-       Console {}
-
-         Assert {
-                  rules =
-                    {
-                      field_rules = [
-                          {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-
-                             {
-                                 rule_type = MIN
-                                 rule_value = 0
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                         ]
-                      }
-                      ]
-                    }
-                  }
-      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
deleted file mode 100644
index 336d364c0..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    # The default format is json, which is optional
-    format = json
-    start_mode = group_offsets
-    schema = {
-       fields {
-            id = bigint
-       }
-     }
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink  {
-       Console {}
-
-         Assert {
-                  rules =
-                    {
-                      field_rules = [
-                          {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-
-                             {
-                                 rule_type = MIN
-                                 rule_value = 100
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 149
-                             }
-                         ]
-                      }
-                      ]
-                    }
-                  }
-      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
deleted file mode 100644
index 0c20ebc9a..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    # The default format is json, which is optional
-    format = json
-    start_mode = latest
-    schema = {
-       fields {
-            id = bigint
-       }
-     }
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink  {
-       Console {}
-
-         Assert {
-                  rules =
-                    {
-                      field_rules = [
-                          {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-
-                             {
-                                 rule_type = MIN
-                                 rule_value = 99
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                         ]
-                      }
-                      ]
-                    }
-                  }
-      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
deleted file mode 100644
index 8f6f00a68..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-    # You can set spark configuration here
-    job.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    # The default format is json, which is optional
-    format = json
-    start_mode = specific_offsets
-    schema = {
-       fields {
-            id = bigint
-       }
-     }
-
-    start_mode.offsets = {
-                test_topic-0 = 50
-             }
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink  {
-       Console {}
-
-         Assert {
-                  rules =
-                    {
-                      field_rules = [
-                          {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-
-                             {
-                                 rule_type = MIN
-                                 rule_value = 50
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                         ]
-                      }
-                      ]
-                    }
-                  }
-      }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
deleted file mode 100644
index e7ec35c7d..000000000
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-    # You can set spark configuration here
-    job.app.name = "SeaTunnel"
-    source.parallelism = 1
-    job.mode = "BATCH"
-}
-
-source {
-  Kafka {
-    bootstrap.servers = "kafkaCluster:9094"
-    topic = "test_topic"
-    result_table_name = "kafka_table"
-    # The default format is json, which is optional
-    format = json
-    start_mode = timestamp
-    schema = {
-       fields {
-            id = bigint
-       }
-     }
-     start_mode.timestamp = 1667179890315
-  }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink  {
-       Console {}
-
-         Assert {
-                  rules =
-                    {
-                      field_rules = [
-                          {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-
-                             {
-                                 rule_type = MIN
-                                 rule_value = 0
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 149
-                             }
-                         ]
-                      }
-                      ]
-                    }
-                  }
-      }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index c0289e789..7f882095b 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -35,7 +35,6 @@
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-mongodb-spark-e2e</module>
         <module>connector-neo4j-spark-e2e</module>
-        <module>connector-kafka-spark-e2e</module>
         <module>connector-iceberg-spark-e2e</module>
     </modules>
 

Reply via email to