This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 59e3ca53 [Api-draft] Add seatunnel kafka connector (#1940)
59e3ca53 is described below
commit 59e3ca533902c1206c4c5729ec3eae6b7959b81c
Author: TrickyZerg <[email protected]>
AuthorDate: Mon May 23 18:26:52 2022 +0800
[Api-draft] Add seatunnel kafka connector (#1940)
* add seatunnel kafka connector
* add license
---
...ginLifeCycle.java => PrepareFailException.java} | 18 ++--
.../api/common/SeaTunnelPluginLifeCycle.java | 3 +-
.../seatunnel/common/constants}/PluginType.java | 6 +-
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../pom.xml | 24 +++--
.../connectors/seatunnel/kafka/config/Config.java | 22 ++--
.../seatunnel/kafka/source/KafkaSource.java | 92 +++++++++++++++++
.../seatunnel/kafka/source/KafkaSourceReader.java | 71 +++++++++++++
.../seatunnel/kafka/source/KafkaSourceSplit.java | 16 ++-
.../kafka/source/KafkaSourceSplitEnumerator.java | 114 +++++++++++++++++++++
.../seatunnel/kafka/state/KafkaState.java | 15 +--
.../core/base/config/AbstractExecutionContext.java | 1 +
.../core/base/config/EnvironmentFactory.java | 1 +
.../core/flink/config/FlinkExecutionContext.java | 2 +-
.../core/spark/config/SparkExecutionContext.java | 2 +-
seatunnel-dist/release-docs/LICENSE | 4 +
tools/dependencies/known-dependencies.txt | 6 +-
17 files changed, 342 insertions(+), 56 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
similarity index 68%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
index e0309e75..bb43d07f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PrepareFailException.java
@@ -17,19 +17,17 @@
package org.apache.seatunnel.api.common;
+import org.apache.seatunnel.common.constants.PluginType;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
/**
- * This interface is the life cycle of a plugin, after a plugin created,
- * will execute prepare method to do some initialize operation.
+ * This exception will throw when {@link
SeaTunnelPluginLifeCycle#prepare(Config)} failed.
*/
-public interface SeaTunnelPluginLifeCycle {
-
- /**
- * Use the pluginConfig to do some initialize operation.
- *
- * @param pluginConfig plugin config.
- */
- void prepare(Config pluginConfig);
+public class PrepareFailException extends RuntimeException {
+ public PrepareFailException(String pluginName, PluginType type, String
message) {
+ super(String.format("PluginName: %s, PluginType: %s, Message: %s",
pluginName, type.getType(),
+ message));
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
index e0309e75..30d7c0f5 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java
@@ -29,7 +29,8 @@ public interface SeaTunnelPluginLifeCycle {
* Use the pluginConfig to do some initialize operation.
*
* @param pluginConfig plugin config.
+ * @throws PrepareFailException if plugin prepare failed, the {@link
PrepareFailException} will throw.
*/
- void prepare(Config pluginConfig);
+ void prepare(Config pluginConfig) throws PrepareFailException;
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
similarity index 92%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to
seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
index f5f30815..d02e3985 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/PluginType.java
@@ -15,9 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.common.constants;
+/**
+ * The type of SeaTunnel plugin.
+ */
public enum PluginType {
+
SOURCE("source"), TRANSFORM("transform"), SINK("sink");
private final String type;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 7897a316..02716bc0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -33,5 +33,6 @@
<modules>
<module>seatunnel-connectors-seatunnel-console</module>
<module>seatunnel-connectors-seatunnel-fake</module>
+ <module>seatunnel-connectors-seatunnel-kafka</module>
</modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
similarity index 68%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
index 7897a316..d547a463 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/pom.xml
@@ -21,17 +21,27 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-seatunnel</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel-kafka</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+ </dependencies>
- <modules>
- <module>seatunnel-connectors-seatunnel-console</module>
- <module>seatunnel-connectors-seatunnel-fake</module>
- </modules>
</project>
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
similarity index 72%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f5f30815..c5fddd25 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -15,18 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
-public enum PluginType {
- SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+public class Config {
+ /**
+ * The topic of kafka.
+ */
+ public static final String TOPIC = "topic";
- private final String type;
-
- PluginType(String type) {
- this.type = type;
- }
-
- public String getType() {
- return type;
- }
+ /**
+ * The server address of kafka cluster.
+ */
+ public static final String BOOTSTRAP_SERVER = "bootstrap.server";
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
new file mode 100644
index 00000000..82db59e0
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.Properties;
+
+public class KafkaSource implements SeaTunnelSource<SeaTunnelRow,
KafkaSourceSplit, KafkaState> {
+
+
+ private String topic;
+ private String bootstrapServer;
+ private Properties properties;
+
+ @Override
+ public String getPluginName() {
+ return "Kafka";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC,
BOOTSTRAP_SERVER);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ this.topic = config.getString(TOPIC);
+ this.bootstrapServer = config.getString(BOOTSTRAP_SERVER);
+ // TODO add user custom properties
+ this.properties = new Properties();
+ }
+
+ @Override
+ public SeaTunnelRowTypeInfo getRowTypeInfo() {
+ return new SeaTunnelRowTypeInfo(new String[]{"topic", "raw_message"},
+ new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, KafkaSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new KafkaSourceReader(this.topic, this.bootstrapServer);
+ }
+
+ @Override
+ public SourceSplitEnumerator<KafkaSourceSplit, KafkaState>
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext) throws Exception {
+ return new KafkaSourceSplitEnumerator(this.topic,
this.bootstrapServer, this.properties);
+ }
+
+ @Override
+ public SourceSplitEnumerator<KafkaSourceSplit, KafkaState>
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext, KafkaState checkpointState) throws Exception {
+ // TODO support state restore
+ return new KafkaSourceSplitEnumerator(this.topic,
this.bootstrapServer, this.properties);
+ }
+
+ @Override
+ public Serializer<KafkaState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
new file mode 100644
index 00000000..cb4bc889
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+import java.util.List;
+
+public class KafkaSourceReader implements SourceReader<SeaTunnelRow,
KafkaSourceSplit> {
+
+ private final String topic;
+ private final String bootstrapServer;
+
+ KafkaSourceReader(String topic, String bootstrapServer) {
+ this.topic = topic;
+ this.bootstrapServer = bootstrapServer;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+
+ }
+
+ @Override
+ public List<KafkaSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<KafkaSourceSplit> splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
similarity index 74%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index f5f30815..ebaa72ec 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -15,18 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-public enum PluginType {
- SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+import org.apache.seatunnel.api.source.SourceSplit;
- private final String type;
+public class KafkaSourceSplit implements SourceSplit {
- PluginType(String type) {
- this.type = type;
- }
-
- public String getType() {
- return type;
+ @Override
+ public String splitId() {
+ return null;
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
new file mode 100644
index 00000000..663be8f1
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSourceSplit, KafkaState> {
+
+ private static final String CLIENT_ID_PREFIX = "seatunnel";
+
+ private final String topic;
+ private final String bootstrapServer;
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private AdminClient adminClient;
+
+ KafkaSourceSplitEnumerator(String topic, String bootstrapServer,
Properties properties) {
+ this.topic = topic;
+ this.bootstrapServer = bootstrapServer;
+ }
+
+ @Override
+ public void open() {
+ this.consumer = initConsumer();
+ this.adminClient = initAdminClient();
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+
+ }
+
+ @Override
+ public KafkaState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+
+ private KafkaConsumer<byte[], byte[]> initConsumer() {
+ Properties props = new Properties();
+
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer");
+
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+
+ // Disable auto create topics feature
+ props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
+ return new KafkaConsumer<>(props);
+ }
+
+ private AdminClient initAdminClient() {
+ Properties props = new Properties();
+
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-admin-client");
+ return AdminClient.create(props);
+ }
+
+}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
similarity index 73%
rename from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
rename to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
index f5f30815..fa6a5518 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
@@ -15,18 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.connectors.seatunnel.kafka.state;
-public enum PluginType {
- SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+import java.io.Serializable;
- private final String type;
-
- PluginType(String type) {
- this.type = type;
- }
-
- public String getType() {
- return type;
- }
+public class KafkaState implements Serializable {
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
index 43ef94c4..e3ed3d72 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index 9daa23b7..0386159c 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.base.config;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.spark.SparkEnvironment;
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
index 24cb9d82..773feb87 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkExecutionContext.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.core.flink.config;
import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginType;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
index 13d1c0f3..7effd19b 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkExecutionContext.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.core.spark.config;
import org.apache.seatunnel.apis.base.api.BaseSink;
import org.apache.seatunnel.apis.base.api.BaseSource;
import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.base.config.AbstractExecutionContext;
import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginType;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 0c696160..c95ba410 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -637,6 +637,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Apache Ivy
(org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
(The Apache Software License, Version 2.0) Apache Kafka
(org.apache.kafka:kafka-clients:2.0.0 - http://kafka.apache.org)
(The Apache Software License, Version 2.0) Apache Kafka
(org.apache.kafka:kafka-clients:2.4.1 - https://kafka.apache.org)
+ (The Apache Software License, Version 2.0) Apache Kafka
(org.apache.kafka:kafka-clients:3.2.1 - https://kafka.apache.org)
(The Apache Software License, Version 2.0) Apache Log4j
(log4j:log4j:1.2.17 - http://logging.apache.org/log4j/1.2/)
(The Apache Software License, Version 2.0) Apache Maven Wagon :: API
(org.apache.maven.wagon:wagon-provider-api:2.4 -
http://maven.apache.org/wagon/wagon-provider-api)
(The Apache Software License, Version 2.0) Apache Parquet Avro
(org.apache.parquet:parquet-avro:1.10.0 - https://parquet.apache.org)
@@ -797,6 +798,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) LZ4 and xxHash
(org.lz4:lz4-java:1.4.0 - https://github.com/lz4/lz4-java)
(The Apache Software License, Version 2.0) LZ4 and xxHash
(org.lz4:lz4-java:1.6.0 - https://github.com/lz4/lz4-java)
(The Apache Software License, Version 2.0) LZ4 and xxHash
(org.lz4:lz4-java:1.7.1 - https://github.com/lz4/lz4-java)
+ (The Apache Software License, Version 2.0) LZ4 and xxHash
(org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
(The Apache Software License, Version 2.0) Log4j (log4j:log4j:1.2.14 -
http://logging.apache.org/log4j/docs/)
(The Apache Software License, Version 2.0) Maven Aether Provider
(org.apache.maven:maven-aether-provider:3.1.1 -
http://maven.apache.org/ref/3.1.1/maven-aether-provider)
(The Apache Software License, Version 2.0) Maven Model
(org.apache.maven:maven-model:3.1.1 -
http://maven.apache.org/ref/3.1.1/maven-model)
@@ -883,6 +885,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) snappy-java
(org.xerial.snappy:snappy-java:1.1.4 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) snappy-java
(org.xerial.snappy:snappy-java:1.1.7.1 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) snappy-java
(org.xerial.snappy:snappy-java:1.1.7.3 - https://github.com/xerial/snappy-java)
+ (The Apache Software License, Version 2.0) snappy-java
(org.xerial.snappy:snappy-java:1.1.8.4 - https://github.com/xerial/snappy-java)
(The Apache Software License, Version 2.0) transport
(org.elasticsearch.client:transport:6.3.1 -
https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) transport
(org.elasticsearch.client:transport:7.5.1 -
https://github.com/elastic/elasticsearch)
(The Apache Software License, Version 2.0) transport-netty4
(org.elasticsearch.plugin:transport-netty4-client:6.3.1 -
https://github.com/elastic/elasticsearch)
@@ -930,6 +933,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.3.2-2 -
https://github.com/luben/zstd-jni)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.3.3-1 -
https://github.com/luben/zstd-jni)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.4.3-1 -
https://github.com/luben/zstd-jni)
+ (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.2-1 -
https://github.com/luben/zstd-jni)
(BSD 3 Clause) Spark-Redis (com.redislabs:spark-redis_2.11:2.6.0 -
http://github.com/RedisLabs/spark-redis)
(BSD 3-Clause) I18n Utils (com.salesforce.i18n:i18n-util:1.0.4 -
https://github.com/salesforce/i18n-util)
(BSD 3-Clause) Scala Compiler (org.scala-lang:scala-compiler:2.11.12 -
http://www.scala-lang.org/)
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 05279dfa..dc1d88ff 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -437,6 +437,7 @@ jsr311-api-1.1.1.jar
jvm-attach-api-1.5.jar
kafka-clients-2.0.0.jar
kafka-clients-2.4.1.jar
+kafka-clients-3.2.0.jar
kerb-admin-1.0.1.jar
kerb-client-1.0.1.jar
kerb-common-1.0.1.jar
@@ -507,6 +508,7 @@ lz4-java-1.4.0.jar
lz4-java-1.4.1.jar
lz4-java-1.6.0.jar
lz4-java-1.7.1.jar
+lz4-java-1.8.0.jar
mapper-extras-client-7.5.1.jar
maven-aether-provider-3.1.1.jar
maven-artifact-3.6.0.jar
@@ -637,6 +639,7 @@ snappy-0.3.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.1.jar
snappy-java-1.1.8.3.jar
+snappy-java-1.1.8.4.jar
spark-catalyst_2.11-2.4.0.jar
spark-hive-thriftserver_2.11-2.3.4.jar
spark-redis_2.11-2.6.0.jar
@@ -697,4 +700,5 @@ zookeeper-3.4.6.jar
zookeeper-3.5.9.jar
zookeeper-jute-3.5.9.jar
zstd-jni-1.3.3-1.jar
-zstd-jni-1.4.3-1.jar
\ No newline at end of file
+zstd-jni-1.4.3-1.jar
+zstd-jni-1.5.2-1.jar
\ No newline at end of file