This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 59e3ca53 [Api-draft] Add seatunnel kafka connector (#1940)
59e3ca53 is described below

commit 59e3ca533902c1206c4c5729ec3eae6b7959b81c
Author: TrickyZerg <[email protected]>
AuthorDate: Mon May 23 18:26:52 2022 +0800

    [Api-draft] Add seatunnel kafka connector (#1940)
    
    * add seatunnel kafka connector
    * add license
---
 ...ginLifeCycle.java => PrepareFailException.java} |  18 ++--
 .../api/common/SeaTunnelPluginLifeCycle.java       |   3 +-
 .../seatunnel/common/constants}/PluginType.java    |   6 +-
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../pom.xml                                        |  24 +++--
 .../connectors/seatunnel/kafka/config/Config.java  |  22 ++--
 .../seatunnel/kafka/source/KafkaSource.java        |  92 +++++++++++++++++
 .../seatunnel/kafka/source/KafkaSourceReader.java  |  71 +++++++++++++
 .../seatunnel/kafka/source/KafkaSourceSplit.java   |  16 ++-
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 114 +++++++++++++++++++++
 .../seatunnel/kafka/state/KafkaState.java          |  15 +--
 .../core/base/config/AbstractExecutionContext.java |   1 +
 .../core/base/config/EnvironmentFactory.java       |   1 +
 .../core/flink/config/FlinkExecutionContext.java   |   2 +-
 .../core/spark/config/SparkExecutionContext.java   |   2 +-
 seatunnel-dist/release-docs/LICENSE                |   4 +
 tools/dependencies/known-dependencies.txt          |   6 +-
 17 files changed, 342 insertions(+), 56 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
similarity index 68%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
index e0309e75..bb43d07f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
@@ -17,19 +17,17 @@
 
 package org.apache.seatunnel.api.common;
 
+import org.apache.seatunnel.common.constants.PluginType;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 /**
- * This interface is the life cycle of a plugin, after a plugin created,
- * will execute prepare method to do some initialize operation.
+ * This exception will throw when {@link 
SeaTunnelPluginLifeCycle#prepare(Config)} failed.
  */
-public interface SeaTunnelPluginLifeCycle {
-
-    /**
-     * Use the pluginConfig to do some initialize operation.
-     *
-     * @param pluginConfig plugin config.
-     */
-    void prepare(Config pluginConfig);
+public class PrepareFailException extends RuntimeException {
 
+    public PrepareFailException(String pluginName, PluginType type, String 
message) {
+        super(String.format("PluginName: %s, PluginType: %s, Message: %s", 
pluginName, type.getType(),
+                message));
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
index e0309e75..30d7c0f5 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
@@ -29,7 +29,8 @@ public interface SeaTunnelPluginLifeCycle {
      * Use the pluginConfig to do some initialize operation.
      *
      * @param pluginConfig plugin config.
+     * @throws PrepareFailException if plugin prepare failed, the {@link 
PrepareFailException} will throw.
      */
-    void prepare(Config pluginConfig);
+    void prepare(Config pluginConfig) throws PrepareFailException;
 
 }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
similarity index 92%
copy from 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to 
seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
index f5f30815..d02e3985 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
@@ -15,9 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.common.constants;
 
+/**
+ * The type of SeaTunnel plugin.
+ */
 public enum PluginType {
+
     SOURCE("source"), TRANSFORM("transform"), SINK("sink");
 
     private final String type;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 7897a316..02716bc0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -33,5 +33,6 @@
     <modules>
         <module>seatunnel-connectors-seatunnel-console</module>
         <module>seatunnel-connectors-seatunnel-fake</module>
+        <module>seatunnel-connectors-seatunnel-kafka</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
similarity index 68%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
index 7897a316..d547a463 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
@@ -21,17 +21,27 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-connectors</artifactId>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-connectors-seatunnel</artifactId>
+    <artifactId>seatunnel-connectors-seatunnel-kafka</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+    </dependencies>
 
-    <modules>
-        <module>seatunnel-connectors-seatunnel-console</module>
-        <module>seatunnel-connectors-seatunnel-fake</module>
-    </modules>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
similarity index 72%
copy from 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f5f30815..c5fddd25 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
-public enum PluginType {
-    SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+public class Config {
+    /**
+     * The topic of kafka.
+     */
+    public static final String TOPIC = "topic";
 
-    private final String type;
-
-    PluginType(String type) {
-        this.type = type;
-    }
-
-    public String getType() {
-        return type;
-    }
+    /**
+     * The server address of kafka cluster.
+     */
+    public static final String BOOTSTRAP_SERVER = "bootstrap.server";
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
new file mode 100644
index 00000000..82db59e0
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.Properties;
+
+public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, 
KafkaSourceSplit, KafkaState> {
+
+
+    private String topic;
+    private String bootstrapServer;
+    private Properties properties;
+
+    @Override
+    public String getPluginName() {
+        return "Kafka";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, 
BOOTSTRAP_SERVER);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        this.topic = config.getString(TOPIC);
+        this.bootstrapServer = config.getString(BOOTSTRAP_SERVER);
+        // TODO add user custom properties
+        this.properties = new Properties();
+    }
+
+    @Override
+    public SeaTunnelRowTypeInfo getRowTypeInfo() {
+        return new SeaTunnelRowTypeInfo(new String[]{"topic", "raw_message"},
+                new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KafkaSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new KafkaSourceReader(this.topic, this.bootstrapServer);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KafkaSourceSplit, KafkaState> 
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext) throws Exception {
+        return new KafkaSourceSplitEnumerator(this.topic, 
this.bootstrapServer, this.properties);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KafkaSourceSplit, KafkaState> 
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext, KafkaState checkpointState) throws Exception {
+        // TODO support state restore
+        return new KafkaSourceSplitEnumerator(this.topic, 
this.bootstrapServer, this.properties);
+    }
+
+    @Override
+    public Serializer<KafkaState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
new file mode 100644
index 00000000..cb4bc889
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+import java.util.List;
+
+public class KafkaSourceReader implements SourceReader<SeaTunnelRow, 
KafkaSourceSplit> {
+
+    private final String topic;
+    private final String bootstrapServer;
+
+    KafkaSourceReader(String topic, String bootstrapServer) {
+        this.topic = topic;
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    @Override
+    public void open() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+
+    }
+
+    @Override
+    public List<KafkaSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return null;
+    }
+
+    @Override
+    public void addSplits(List<KafkaSourceSplit> splits) {
+
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
similarity index 74%
copy from 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index f5f30815..ebaa72ec 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -15,18 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
-public enum PluginType {
-    SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+import org.apache.seatunnel.api.source.SourceSplit;
 
-    private final String type;
+public class KafkaSourceSplit implements SourceSplit {
 
-    PluginType(String type) {
-        this.type = type;
-    }
-
-    public String getType() {
-        return type;
+    @Override
+    public String splitId() {
+        return null;
     }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
new file mode 100644
index 00000000..663be8f1
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSourceSplit, KafkaState> {
+
+    private static final String CLIENT_ID_PREFIX = "seatunnel";
+
+    private final String topic;
+    private final String bootstrapServer;
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private AdminClient adminClient;
+
+    KafkaSourceSplitEnumerator(String topic, String bootstrapServer, 
Properties properties) {
+        this.topic = topic;
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    @Override
+    public void open() {
+        this.consumer = initConsumer();
+        this.adminClient = initAdminClient();
+    }
+
+    @Override
+    public void run() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
+
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+
+    }
+
+    @Override
+    public KafkaState snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+
+    private KafkaConsumer<byte[], byte[]> initConsumer() {
+        Properties props = new Properties();
+
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer");
+
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+
+        // Disable auto create topics feature
+        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
+        return new KafkaConsumer<>(props);
+    }
+
+    private AdminClient initAdminClient() {
+        Properties props = new Properties();
+
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-admin-client");
+        return AdminClient.create(props);
+    }
+
+}
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
similarity index 73%
rename from 
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
rename to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
index f5f30815..fa6a5518 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
@@ -15,18 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
-public enum PluginType {
-    SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+import java.io.Serializable;
 
-    private final String type;
-
-    PluginType(String type) {
-        this.type = type;
-    }
-
-    public String getType() {
-        return type;
-    }
+public class KafkaState implements Serializable {
 }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index 43ef94c4..e3ed3d72 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index 9daa23b7..0386159c 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.base.config;
 
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 24cb9d82..773feb87 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.core.flink.config;
 import org.apache.seatunnel.apis.base.api.BaseSink;
 import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
 import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginType;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 13d1c0f3..7effd19b 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.core.spark.config;
 import org.apache.seatunnel.apis.base.api.BaseSink;
 import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
 import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginType;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
 import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 0c696160..c95ba410 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -637,6 +637,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Apache Ivy 
(org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
      (The Apache Software License, Version 2.0) Apache Kafka 
(org.apache.kafka:kafka-clients:2.0.0 - http://kafka.apache.org)
      (The Apache Software License, Version 2.0) Apache Kafka 
(org.apache.kafka:kafka-clients:2.4.1 - https://kafka.apache.org)
+     (The Apache Software License, Version 2.0) Apache Kafka 
(org.apache.kafka:kafka-clients:3.2.1 - https://kafka.apache.org)
      (The Apache Software License, Version 2.0) Apache Log4j 
(log4j:log4j:1.2.17 - http://logging.apache.org/log4j/1.2/)
      (The Apache Software License, Version 2.0) Apache Maven Wagon :: API 
(org.apache.maven.wagon:wagon-provider-api:2.4 - 
http://maven.apache.org/wagon/wagon-provider-api)
      (The Apache Software License, Version 2.0) Apache Parquet Avro 
(org.apache.parquet:parquet-avro:1.10.0 - https://parquet.apache.org)
@@ -797,6 +798,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) LZ4 and xxHash 
(org.lz4:lz4-java:1.4.0 - https://github.com/lz4/lz4-java)
      (The Apache Software License, Version 2.0) LZ4 and xxHash 
(org.lz4:lz4-java:1.6.0 - https://github.com/lz4/lz4-java)
      (The Apache Software License, Version 2.0) LZ4 and xxHash 
(org.lz4:lz4-java:1.7.1 - https://github.com/lz4/lz4-java)
+     (The Apache Software License, Version 2.0) LZ4 and xxHash 
(org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
      (The Apache Software License, Version 2.0) Log4j (log4j:log4j:1.2.14 - 
http://logging.apache.org/log4j/docs/)
      (The Apache Software License, Version 2.0) Maven Aether Provider 
(org.apache.maven:maven-aether-provider:3.1.1 - 
http://maven.apache.org/ref/3.1.1/maven-aether-provider)
      (The Apache Software License, Version 2.0) Maven Model 
(org.apache.maven:maven-model:3.1.1 - 
http://maven.apache.org/ref/3.1.1/maven-model)
@@ -883,6 +885,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) snappy-java 
(org.xerial.snappy:snappy-java:1.1.4 - https://github.com/xerial/snappy-java)
      (The Apache Software License, Version 2.0) snappy-java 
(org.xerial.snappy:snappy-java:1.1.7.1 - https://github.com/xerial/snappy-java)
      (The Apache Software License, Version 2.0) snappy-java 
(org.xerial.snappy:snappy-java:1.1.7.3 - https://github.com/xerial/snappy-java)
+     (The Apache Software License, Version 2.0) snappy-java 
(org.xerial.snappy:snappy-java:1.1.8.4 - https://github.com/xerial/snappy-java)
      (The Apache Software License, Version 2.0) transport 
(org.elasticsearch.client:transport:6.3.1 - 
https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) transport 
(org.elasticsearch.client:transport:7.5.1 - 
https://github.com/elastic/elasticsearch)
      (The Apache Software License, Version 2.0) transport-netty4 
(org.elasticsearch.plugin:transport-netty4-client:6.3.1 - 
https://github.com/elastic/elasticsearch)
@@ -930,6 +933,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
      (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.3.2-2 - 
https://github.com/luben/zstd-jni)
      (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.3.3-1 - 
https://github.com/luben/zstd-jni)
      (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.4.3-1 - 
https://github.com/luben/zstd-jni)
+     (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.2-1 - 
https://github.com/luben/zstd-jni)
      (BSD 3 Clause) Spark-Redis (com.redislabs:spark-redis_2.11:2.6.0 - 
http://github.com/RedisLabs/spark-redis)
      (BSD 3-Clause) I18n Utils (com.salesforce.i18n:i18n-util:1.0.4 - 
https://github.com/salesforce/i18n-util)
      (BSD 3-Clause) Scala Compiler (org.scala-lang:scala-compiler:2.11.12 - 
http://www.scala-lang.org/)
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 05279dfa..dc1d88ff 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -437,6 +437,7 @@ jsr311-api-1.1.1.jar
 jvm-attach-api-1.5.jar
 kafka-clients-2.0.0.jar
 kafka-clients-2.4.1.jar
+kafka-clients-3.2.0.jar
 kerb-admin-1.0.1.jar
 kerb-client-1.0.1.jar
 kerb-common-1.0.1.jar
@@ -507,6 +508,7 @@ lz4-java-1.4.0.jar
 lz4-java-1.4.1.jar
 lz4-java-1.6.0.jar
 lz4-java-1.7.1.jar
+lz4-java-1.8.0.jar
 mapper-extras-client-7.5.1.jar
 maven-aether-provider-3.1.1.jar
 maven-artifact-3.6.0.jar
@@ -637,6 +639,7 @@ snappy-0.3.jar
 snappy-java-1.1.4.jar
 snappy-java-1.1.7.1.jar
 snappy-java-1.1.8.3.jar
+snappy-java-1.1.8.4.jar
 spark-catalyst_2.11-2.4.0.jar
 spark-hive-thriftserver_2.11-2.3.4.jar
 spark-redis_2.11-2.6.0.jar
@@ -697,4 +700,5 @@ zookeeper-3.4.6.jar
 zookeeper-3.5.9.jar
 zookeeper-jute-3.5.9.jar
 zstd-jni-1.3.3-1.jar
-zstd-jni-1.4.3-1.jar
\ No newline at end of file
+zstd-jni-1.4.3-1.jar
+zstd-jni-1.5.2-1.jar
\ No newline at end of file

Reply via email to