This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 694bbdd7 [Api-draft] Add seatunnel kafka source connectors  (#1949)
694bbdd7 is described below

commit 694bbdd709110e647628378a44bdf3347754dfe9
Author: TrickyZerg <[email protected]>
AuthorDate: Thu May 26 16:04:51 2022 +0800

    [Api-draft] Add seatunnel kafka source connectors  (#1949)
    
    * add seatunnel kafka connector
    * fix ParallelSource can't stop when thread pool have error.
---
 .../api/source/SourceSplitEnumerator.java          |   2 +-
 .../apache/seatunnel/flink/FlinkEnvironment.java   |  13 ++-
 .../connectors/seatunnel/kafka/config/Config.java  |  10 ++
 .../kafka/sink/KafkaNoTransactionSender.java       |   6 +-
 .../seatunnel/kafka/sink/KafkaProduceSender.java   |   6 +-
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  10 +-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |   8 +-
 .../kafka/sink/KafkaTransactionSender.java         |  10 +-
 .../seatunnel/kafka/source/ConsumerMetadata.java   |  73 ++++++++++++
 .../seatunnel/kafka/source/KafkaSource.java        |  57 +++++++---
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 117 +++++++++++++++++--
 .../seatunnel/kafka/source/KafkaSourceSplit.java   |  46 +++++++-
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 126 +++++++++++++++------
 .../state/{KafkaState.java => KafkaSinkState.java} |   2 +-
 .../{KafkaState.java => KafkaSourceState.java}     |  23 ++--
 .../core/starter/flink/env/FlinkEnvironment.java   |  13 ++-
 .../flink/execution/FlinkTaskExecution.java        |  27 +----
 .../spark/execution/SparkTaskExecution.java        |  21 +---
 .../translation/source/ParallelSource.java         |  22 +++-
 .../translation/spark/sink/SparkDataWriter.java    |   2 +-
 20 files changed, 442 insertions(+), 152 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 4b97cee6..3b1917bd 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -37,7 +37,7 @@ public interface SourceSplitEnumerator<SplitT extends 
SourceSplit, StateT> exten
     /**
      * The method is executed by the engine only once.
      */
-    void run();
+    void run() throws Exception;
 
     /**
      * Called to close the enumerator, in case it holds on to any resources, 
like threads or network
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index a2d57234..eaf6780d 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.flink.util.EnvironmentUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
@@ -87,10 +88,10 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     @Override
     public FlinkEnvironment prepare() {
-        if (isStreaming()) {
-            createStreamEnvironment();
-            createStreamTableEnvironment();
-        } else {
+        // Batch/Streaming both use data stream api in SeaTunnel New API
+        createStreamEnvironment();
+        createStreamTableEnvironment();
+        if (!isStreaming()) {
             createExecutionEnvironment();
             createBatchTableEnvironment();
         }
@@ -201,6 +202,10 @@ public class FlinkEnvironment implements RuntimeEnv {
             int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
             environment.setMaxParallelism(max);
         }
+
+        if (this.jobMode.equals(JobMode.BATCH)) {
+            environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        }
     }
 
     public ExecutionEnvironment getBatchEnvironment() {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 1eebcdb5..923348f4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -23,10 +23,20 @@ public class Config {
      */
     public static final String TOPIC = "topic";
 
+    /**
+     * The topic of kafka is java pattern or list.
+     */
+    public static final String PATTERN = "pattern";
+
     /**
      * The server address of kafka cluster.
      */
     public static final String BOOTSTRAP_SERVER = "bootstrap.server";
 
     public static final String KAFKA_CONFIG_PREFIX = "kafka.";
+
+    /**
+     * consumer group of kafka client consume message.
+     */
+    public static final String CONSUMER_GROUP = "consumer.group";
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index 3691acd9..4760830f 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -63,12 +63,12 @@ public class KafkaNoTransactionSender<K, V> implements 
KafkaProduceSender<K, V>
     }
 
     @Override
-    public void abortTransaction(List<KafkaState> kafkaStates) {
+    public void abortTransaction(List<KafkaSinkState> kafkaStates) {
         // no-op
     }
 
     @Override
-    public List<KafkaState> snapshotState() {
+    public List<KafkaSinkState> snapshotState() {
         kafkaProducer.flush();
         return Collections.emptyList();
     }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
index 9836cb9e..f1d5ce51 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
@@ -52,13 +52,13 @@ public interface KafkaProduceSender<K, V> extends 
AutoCloseable {
      *
      * @param kafkaStates kafka states about the transaction info.
      */
-    void abortTransaction(List<KafkaState> kafkaStates);
+    void abortTransaction(List<KafkaSinkState> kafkaStates);
 
     /**
      * Get the current kafka state of the sender.
      *
      * @return kafka state List, or empty if no state is available.
      */
-    List<KafkaState> snapshotState();
+    List<KafkaSinkState> snapshotState();
 
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 5e63d79c..70cfacdb 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -42,7 +42,7 @@ import java.util.Optional;
  * This class contains the method to create {@link KafkaSinkWriter} and {@link 
KafkaSinkCommitter}.
  */
 @AutoService(SeaTunnelSink.class)
-public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState, 
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
+public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, 
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
 
     private Config pluginConfig;
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
@@ -58,17 +58,17 @@ public class KafkaSink implements 
SeaTunnelSink<SeaTunnelRow, KafkaState, KafkaC
     }
 
     @Override
-    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState> 
createWriter(SinkWriter.Context context) {
+    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> 
createWriter(SinkWriter.Context context) {
         return new KafkaSinkWriter(context, seaTunnelRowTypeInfo, 
pluginConfig, Collections.emptyList());
     }
 
     @Override
-    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState> 
restoreWriter(SinkWriter.Context context, List<KafkaState> states) {
+    public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> 
restoreWriter(SinkWriter.Context context, List<KafkaSinkState> states) {
         return new KafkaSinkWriter(context, seaTunnelRowTypeInfo, 
pluginConfig, states);
     }
 
     @Override
-    public Optional<Serializer<KafkaState>> getWriterStateSerializer() {
+    public Optional<Serializer<KafkaSinkState>> getWriterStateSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6d3a122c..6605f5bc 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -25,7 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -40,7 +40,7 @@ import java.util.Properties;
 /**
  * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to 
Kafka.
  */
-public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, 
KafkaCommitInfo, KafkaState> {
+public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, 
KafkaCommitInfo, KafkaSinkState> {
 
     private final SinkWriter.Context context;
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
@@ -61,7 +61,7 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         SinkWriter.Context context,
         SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
         Config pluginConfig,
-        List<KafkaState> kafkaStates) {
+        List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
         this.pluginConfig = pluginConfig;
@@ -77,7 +77,7 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     @Override
-    public List<KafkaState> snapshotState() {
+    public List<KafkaSinkState> snapshotState() {
         return kafkaProducerSender.snapshotState();
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 766313c4..cf7da852 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import com.google.common.collect.Lists;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,11 +73,11 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     }
 
     @Override
-    public void abortTransaction(List<KafkaState> kafkaStates) {
+    public void abortTransaction(List<KafkaSinkState> kafkaStates) {
         if (kafkaStates.isEmpty()) {
             return;
         }
-        for (KafkaState kafkaState : kafkaStates) {
+        for (KafkaSinkState kafkaState : kafkaStates) {
             // create the transaction producer
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Abort kafka transaction: {}", 
kafkaState.getTransactionId());
@@ -89,8 +89,8 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     }
 
     @Override
-    public List<KafkaState> snapshotState() {
-        return Lists.newArrayList(new KafkaState(transactionId, 
kafkaProperties));
+    public List<KafkaSinkState> snapshotState() {
+        return Lists.newArrayList(new KafkaSinkState(transactionId, 
kafkaProperties));
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
new file mode 100644
index 00000000..f52b6146
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Kafka consumer metadata, include topic, bootstrap server etc.
+ */
+public class ConsumerMetadata implements Serializable {
+
+    private String topic;
+    private boolean isPattern = false;
+    private String bootstrapServer;
+    private Properties properties;
+    private String consumerGroup;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public boolean isPattern() {
+        return isPattern;
+    }
+
+    public void setPattern(boolean pattern) {
+        isPattern = pattern;
+    }
+
+    public String getBootstrapServer() {
+        return bootstrapServer;
+    }
+
+    public void setBootstrapServer(String bootstrapServer) {
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 82db59e0..5d3f98b9 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
@@ -32,19 +34,23 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
+
 import java.util.Properties;
 
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, 
KafkaSourceSplit, KafkaState> {
+@AutoService(SeaTunnelSource.class)
+public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, 
KafkaSourceSplit, KafkaSourceState> {
 
+    private static final String DEFAULT_CONSUMER_GROUP = 
"SeaTunnel-Consumer-Group";
 
-    private String topic;
-    private String bootstrapServer;
-    private Properties properties;
+    private final ConsumerMetadata metadata = new ConsumerMetadata();
+    private SeaTunnelRowTypeInfo typeInfo;
 
     @Override
     public String getPluginName() {
@@ -57,36 +63,51 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
-        this.topic = config.getString(TOPIC);
-        this.bootstrapServer = config.getString(BOOTSTRAP_SERVER);
-        // TODO add user custom properties
-        this.properties = new Properties();
+        this.metadata.setTopic(config.getString(TOPIC));
+        if (config.hasPath(PATTERN)) {
+            this.metadata.setPattern(config.getBoolean(PATTERN));
+        }
+        this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+        this.metadata.setProperties(new Properties());
+
+        if (config.hasPath(CONSUMER_GROUP)) {
+            this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+        } else {
+            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+        }
+
+        TypesafeConfigUtils.extractSubConfig(config, "kafka.", 
false).entrySet().forEach(e -> {
+            this.metadata.getProperties().put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
+        });
+
+        // TODO support user custom row type
+        this.typeInfo = new SeaTunnelRowTypeInfo(new String[]{"topic", 
"raw_message"},
+                new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+
     }
 
     @Override
     public SeaTunnelRowTypeInfo getRowTypeInfo() {
-        return new SeaTunnelRowTypeInfo(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+        return this.typeInfo;
     }
 
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.topic, this.bootstrapServer);
+        return new KafkaSourceReader(this.metadata, this.typeInfo, 
readerContext);
     }
 
     @Override
-    public SourceSplitEnumerator<KafkaSourceSplit, KafkaState> 
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext) throws Exception {
-        return new KafkaSourceSplitEnumerator(this.topic, 
this.bootstrapServer, this.properties);
+    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext) throws Exception {
+        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext);
     }
 
     @Override
-    public SourceSplitEnumerator<KafkaSourceSplit, KafkaState> 
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext, KafkaState checkpointState) throws Exception {
-        // TODO support state restore
-        return new KafkaSourceSplitEnumerator(this.topic, 
this.bootstrapServer, this.properties);
+    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext, KafkaSourceState checkpointState) throws Exception {
+        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext, checkpointState);
     }
 
     @Override
-    public Serializer<KafkaState> getEnumeratorStateSerializer() {
+    public Serializer<KafkaSourceState> getEnumeratorStateSerializer() {
         return new DefaultSerializer<>();
     }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index cb4bc889..3158858c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,55 +17,148 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+
+import com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class KafkaSourceReader implements SourceReader<SeaTunnelRow, 
KafkaSourceSplit> {
 
-    private final String topic;
-    private final String bootstrapServer;
-
-    KafkaSourceReader(String topic, String bootstrapServer) {
-        this.topic = topic;
-        this.bootstrapServer = bootstrapServer;
+    private static final long THREAD_WAIT_TIME = 500L;
+    private static final long POLL_TIMEOUT = 10000L;
+    private static final String CLIENT_ID_PREFIX = "seatunnel";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSourceReader.class);
+
+    private final SourceReader.Context context;
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private final ConsumerMetadata metadata;
+    private final Set<KafkaSourceSplit> sourceSplits;
+    private final Map<TopicPartition, Long> endOffset;
+    // TODO support user custom type
+    private SeaTunnelRowTypeInfo typeInfo;
+
+    KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowTypeInfo typeInfo,
+                      SourceReader.Context context) {
+        this.metadata = metadata;
+        this.context = context;
+        this.typeInfo = typeInfo;
+        this.sourceSplits = new HashSet<>();
+        this.endOffset = new HashMap<>();
     }
 
     @Override
     public void open() {
-
+        this.consumer = initConsumer(this.metadata.getBootstrapServer(), 
this.metadata.getConsumerGroup(),
+                this.metadata.getProperties());
     }
 
     @Override
     public void close() throws IOException {
-
+        if (consumer != null) {
+            consumer.close();
+        }
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-
+        if (sourceSplits.isEmpty() || sourceSplits.size() != 
this.endOffset.size()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        Set<TopicPartition> partitions = convertToPartition(sourceSplits);
+        StringDeserializer stringDeserializer = new StringDeserializer();
+        
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
+        consumer.assign(partitions);
+        while (true) {
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+            for (TopicPartition partition : partitions) {
+                for (ConsumerRecord<byte[], byte[]> record : 
records.records(partition)) {
+
+                    String v = 
stringDeserializer.deserialize(partition.topic(), record.value());
+                    String t = partition.topic();
+                    output.collect(new SeaTunnelRow(new Object[]{t, v}));
+
+                    if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
+                            record.offset() >= endOffset.get(partition)) {
+                        break;
+                    }
+                }
+            }
+
+            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                // signal to the source that we have reached the end of the 
data.
+                context.signalNoMoreElement();
+                break;
+            }
+        }
     }
 
     @Override
     public List<KafkaSourceSplit> snapshotState(long checkpointId) throws 
Exception {
-        return null;
+        return new ArrayList<>(sourceSplits);
     }
 
     @Override
     public void addSplits(List<KafkaSourceSplit> splits) {
-
+        sourceSplits.addAll(splits);
+        sourceSplits.forEach(partition -> {
+            endOffset.put(partition.getTopicPartition(), 
partition.getEndOffset());
+        });
     }
 
     @Override
     public void handleNoMoreSplits() {
-
+        LOGGER.info("receive no more splits message, this reader will not add 
new split.");
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // TODO commit offset
+    }
 
+    private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer, 
String consumerGroup,
+                                                       Properties properties) {
+        Properties props = new Properties(properties);
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer");
+
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+
+        // Disable auto create topics feature
+        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
+        return new KafkaConsumer<>(props);
     }
+
+    private Set<TopicPartition> 
convertToPartition(Collection<KafkaSourceSplit> sourceSplits) {
+        return 
sourceSplits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toSet());
+    }
+
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index ebaa72ec..be2348e9 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -19,10 +19,54 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
 public class KafkaSourceSplit implements SourceSplit {
 
+    private TopicPartition topicPartition;
+    private long endOffset = -1L;
+
+    public KafkaSourceSplit(TopicPartition topicPartition) {
+        this.topicPartition = topicPartition;
+    }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+
+    public void setTopicPartition(TopicPartition topicPartition) {
+        this.topicPartition = topicPartition;
+    }
+
     @Override
     public String splitId() {
-        return null;
+        return topicPartition.topic() + "-" + topicPartition.partition();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaSourceSplit that = (KafkaSourceSplit) o;
+        return Objects.equals(topicPartition, that.topicPartition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topicPartition, endOffset);
     }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 663be8f1..fe652d25 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -18,97 +18,151 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.TopicPartition;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
-public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSourceSplit, KafkaState> {
+public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> {
 
     private static final String CLIENT_ID_PREFIX = "seatunnel";
 
-    private final String topic;
-    private final String bootstrapServer;
-    private KafkaConsumer<byte[], byte[]> consumer;
+    private final ConsumerMetadata metadata;
+    private final Context<KafkaSourceSplit> context;
     private AdminClient adminClient;
 
-    KafkaSourceSplitEnumerator(String topic, String bootstrapServer, 
Properties properties) {
-        this.topic = topic;
-        this.bootstrapServer = bootstrapServer;
+    private Set<KafkaSourceSplit> pendingSplit;
+    private Set<KafkaSourceSplit> assignedSplit;
+
+    KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context) {
+        this.metadata = metadata;
+        this.context = context;
+    }
+
+    KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context,
+                               KafkaSourceState sourceState) {
+        this(metadata, context);
+        this.assignedSplit = sourceState.getAssignedSplit();
     }
 
     @Override
     public void open() {
-        this.consumer = initConsumer();
-        this.adminClient = initAdminClient();
+        this.adminClient = initAdminClient(this.metadata.getProperties());
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
     }
 
     @Override
-    public void run() {
-
+    public void run() throws ExecutionException, InterruptedException {
+        pendingSplit = getTopicInfo();
+        assignSplit(context.registeredReaders());
     }
 
     @Override
     public void close() throws IOException {
-
+        if (this.adminClient != null) {
+            adminClient.close();
+        }
     }
 
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
-
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return 0;
+        return pendingSplit.size();
     }
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-
+        // Do nothing because Kafka source push split.
     }
 
     @Override
     public void registerReader(int subtaskId) {
-
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
     }
 
     @Override
-    public KafkaState snapshotState(long checkpointId) throws Exception {
-        return null;
+    public KafkaSourceState snapshotState(long checkpointId) throws Exception {
+        return new KafkaSourceState(assignedSplit);
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // Do nothing
+    }
 
+    private AdminClient initAdminClient(Properties properties) {
+        Properties props = new Properties(properties);
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.metadata.getBootstrapServer());
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-admin-client");
+        return AdminClient.create(props);
     }
 
-    private KafkaConsumer<byte[], byte[]> initConsumer() {
-        Properties props = new Properties();
+    private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, 
InterruptedException {
+        Collection<String> topics;
+        if (this.metadata.isPattern()) {
+            Pattern pattern = Pattern.compile(this.metadata.getTopic());
+            topics = this.adminClient.listTopics().names().get().stream()
+                    .filter(t -> 
pattern.matcher(t).matches()).collect(Collectors.toSet());
+        } else {
+            topics = Arrays.asList(this.metadata.getTopic().split(","));
+        }
+        Collection<TopicPartition> partitions =
+                
adminClient.describeTopics(topics).allTopicNames().get().values().stream().flatMap(t
 -> t.partitions().stream()
+                        .map(p -> new TopicPartition(t.name(), 
p.partition()))).collect(Collectors.toSet());
+        return 
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p 
-> OffsetSpec.latest())))
+                .all().get().entrySet().stream().map(partition -> {
+                    KafkaSourceSplit split = new 
KafkaSourceSplit(partition.getKey());
+                    split.setEndOffset(partition.getValue().offset());
+                    return split;
+                }).collect(Collectors.toSet());
+    }
 
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer");
+    private void assignSplit(Collection<Integer> taskIDList) {
+        Map<Integer, List<KafkaSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
+        for (int taskID : taskIDList) {
+            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+        }
+        pendingSplit.forEach(s -> 
readySplit.get(getSplitOwner(s.getTopicPartition(), taskIDList.size()))
+                .add(s));
 
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
+        readySplit.forEach(context::assignSplit);
 
-        // Disable auto create topics feature
-        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
-        return new KafkaConsumer<>(props);
+        assignedSplit.addAll(pendingSplit);
+        pendingSplit.clear();
     }
 
-    private AdminClient initAdminClient() {
-        Properties props = new Properties();
-
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-admin-client");
-        return AdminClient.create(props);
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private static int getSplitOwner(TopicPartition tp, int numReaders) {
+        int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % 
numReaders;
+        return (startIndex + tp.partition()) % numReaders;
     }
 
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
similarity index 95%
copy from 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
index 7ab11ddf..311d1e29 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
@@ -25,7 +25,7 @@ import java.util.Properties;
 
 @Data
 @AllArgsConstructor
-public class KafkaState implements Serializable {
+public class KafkaSinkState implements Serializable {
 
     private final String transactionId;
     private final Properties kafkaProperties;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
similarity index 62%
rename from 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
rename to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
index 7ab11ddf..e6ba535b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
@@ -17,17 +17,24 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
 
 import java.io.Serializable;
-import java.util.Properties;
+import java.util.Set;
 
-@Data
-@AllArgsConstructor
-public class KafkaState implements Serializable {
+public class KafkaSourceState implements Serializable {
 
-    private final String transactionId;
-    private final Properties kafkaProperties;
+    private Set<KafkaSourceSplit> assignedSplit;
 
+    public KafkaSourceState(Set<KafkaSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<KafkaSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+
+    public void setAssignedSplit(Set<KafkaSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
index 337e6c8d..bd216dfa 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.flink.util.EnvironmentUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
@@ -87,10 +88,10 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     @Override
     public FlinkEnvironment prepare() {
-        if (isStreaming()) {
-            createStreamEnvironment();
-            createStreamTableEnvironment();
-        } else {
+        // Batch/Streaming both use data stream api in SeaTunnel New API
+        createStreamEnvironment();
+        createStreamTableEnvironment();
+        if (!isStreaming()) {
             createExecutionEnvironment();
             createBatchTableEnvironment();
         }
@@ -201,6 +202,10 @@ public class FlinkEnvironment implements RuntimeEnv {
             int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
             environment.setMaxParallelism(max);
         }
+
+        if (this.jobMode.equals(JobMode.BATCH)) {
+            environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        }
     }
 
     public ExecutionEnvironment getBatchEnvironment() {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
index a2fcd10f..c7e8759b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
@@ -18,7 +18,8 @@
 package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -46,7 +47,8 @@ public class FlinkTaskExecution {
 
     public FlinkTaskExecution(Config config) {
         this.config = config;
-        this.flinkEnvironment = createFlinkEnvironment();
+        this.flinkEnvironment = (FlinkEnvironment) new 
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
+        
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
         this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
         this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
         this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
@@ -61,25 +63,4 @@ public class FlinkTaskExecution {
         LOGGER.info("Flink Execution Plan:{}", 
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
         
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
     }
-
-    private FlinkEnvironment createFlinkEnvironment() {
-        // todo: we need to split the new api into a separate module.
-        // we override the environment here, since we need to create 
StreamExecutionEnvironment.
-        FlinkEnvironment flinkEnvironment = new FlinkEnvironment() {
-            @Override
-            public boolean isStreaming() {
-                return true;
-            }
-        };
-        Config envConfig = config.getConfig("env");
-        JobMode jobMode = JobMode.STREAMING;
-        if (envConfig.hasPath("job.mode")) {
-            jobMode = envConfig.getEnum(JobMode.class, "job.mode");
-        }
-        SeaTunnelContext.getContext().setJobMode(jobMode);
-        flinkEnvironment.setConfig(envConfig)
-            .setJobMode(jobMode)
-            .prepare();
-        return flinkEnvironment;
-    }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
index 1e2b5aa4..5ddd0474 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.spark.SparkEnvironment;
@@ -45,7 +44,8 @@ public class SparkTaskExecution {
 
     public SparkTaskExecution(Config config) {
         this.config = config;
-        this.sparkEnvironment = getSparkEnvironment(config);
+        this.sparkEnvironment = (SparkEnvironment) new 
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+        
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
         this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
         this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
         this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
@@ -59,21 +59,4 @@ public class SparkTaskExecution {
 
         LOGGER.info("Spark Execution started");
     }
-
-    private SparkEnvironment getSparkEnvironment(Config config) {
-        SparkEnvironment sparkEnvironment = (SparkEnvironment) new 
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-
-        Config envConfig = config.getConfig("env");
-        JobMode jobMode = JobMode.STREAMING;
-        if (envConfig.hasPath("job.mode")) {
-            jobMode = envConfig.getEnum(JobMode.class, "job.mode");
-        }
-        SeaTunnelContext.getContext().setJobMode(jobMode);
-
-        sparkEnvironment.setJobMode(JobMode.STREAMING)
-            .setConfig(config)
-            .prepare();
-
-        return sparkEnvironment;
-    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 2e55e84c..83e61f78 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements 
AutoCloseable, CheckpointListener {
@@ -101,8 +102,18 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
     }
 
     public void run(Collector<T> collector) throws Exception {
-        executorService.execute(() -> splitEnumerator.run());
+        Future<?> future = executorService.submit(() -> {
+            try {
+                splitEnumerator.run();
+            } catch (Exception e) {
+                throw new RuntimeException("SourceSplitEnumerator run 
failed.", e);
+            }
+        });
+
         while (running) {
+            if (future.isDone()) {
+                future.get();
+            }
             reader.pollNext(collector);
         }
     }
@@ -117,9 +128,12 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT> implements Au
             executorService.shutdown();
         }
 
-        try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator;
-             SourceReader<T, SplitT> closedReader = reader) {
-            // just close the resources
+        if (splitEnumerator != null) {
+            splitEnumerator.close();
+        }
+
+        if (reader != null) {
+            reader.close();
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 02272ae0..8aab52db 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -44,7 +44,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements 
DataWriter<Internal
     private CommitInfoT latestCommitInfoT;
 
     SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
-                    SinkCommitter<CommitInfoT> sinkCommitter,
+                    @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
                     StructType schema) {
         this.sinkWriter = sinkWriter;
         this.sinkCommitter = sinkCommitter;

Reply via email to