This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 705a2899 [Bug] [core] Cannot load plugin due to plugin name is not
class name (#1664)
705a2899 is described below
commit 705a289946c55b35451e12cb0231c44f71120841
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Apr 7 14:46:20 2022 +0800
[Bug] [core] Cannot load plugin due to plugin name is not class name (#1664)
* [Bug] [core] Cannot load plugin due to plugin name is not class name
* Fix flink plugin name
* fix spark plugin name
* Change plugin name to uppercase
---
.../src/main/java/org/apache/seatunnel/plugin/Plugin.java | 8 ++++++++
.../java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java | 6 ++++++
.../java/org/apache/seatunnel/flink/doris/sink/DorisSink.java | 5 +++++
.../java/org/apache/seatunnel/flink/druid/sink/DruidSink.java | 5 +++++
.../java/org/apache/seatunnel/flink/druid/source/DruidSource.java | 5 +++++
.../apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java | 5 +++++
.../java/org/apache/seatunnel/flink/fake/source/FakeSource.java | 5 +++++
.../org/apache/seatunnel/flink/fake/source/FakeSourceStream.java | 5 +++++
.../main/java/org/apache/seatunnel/flink/file/sink/FileSink.java | 5 +++++
.../java/org/apache/seatunnel/flink/file/source/FileSource.java | 5 +++++
.../org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java | 5 +++++
.../apache/seatunnel/flink/influxdb/source/InfluxDbSource.java | 5 +++++
.../main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 5 +++++
.../java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java | 5 +++++
.../java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java | 5 +++++
.../org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java | 5 +++++
.../org/apache/seatunnel/flink/socket/source/SocketStream.java | 5 +++++
.../org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala | 2 ++
.../apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala | 2 ++
.../scala/org/apache/seatunnel/spark/console/sink/Console.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala | 7 +++++++
.../apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala | 6 ++++++
.../seatunnel/spark/elasticsearch/source/Elasticsearch.scala | 6 ++++++
.../main/scala/org/apache/seatunnel/spark/email/sink/Email.scala | 7 +++++++
.../main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala | 1 +
.../scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/file/sink/File.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/file/source/File.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala | 2 ++
.../scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala | 2 ++
.../scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala | 2 ++
.../scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala | 1 +
.../main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala | 2 ++
.../org/apache/seatunnel/spark/kafka/source/KafkaStream.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala | 1 +
.../scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala | 1 +
.../scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala | 1 +
.../scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala | 2 ++
.../scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala | 1 +
.../scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala | 1 +
.../main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala | 2 ++
.../scala/org/apache/seatunnel/spark/redis/source/Redis.scala | 1 +
.../org/apache/seatunnel/spark/socket/source/SocketStream.scala | 1 +
.../main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala | 1 +
.../main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala | 2 ++
.../src/main/java/org/apache/seatunnel/config/PluginFactory.java | 3 +--
.../src/main/java/org/apache/seatunnel/flink/transform/Split.java | 4 ++++
.../src/main/java/org/apache/seatunnel/flink/transform/Sql.java | 5 +++++
.../main/scala/org/apache/seatunnel/spark/transform/Json.scala | 2 ++
.../main/scala/org/apache/seatunnel/spark/transform/Split.scala | 2 ++
.../src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala | 2 ++
58 files changed, 183 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 35237b6d..6dd2ac44 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -71,4 +71,12 @@ public interface Plugin<T extends RuntimeEnv> extends
Serializable, AutoCloseabl
}
+ /**
+ * Return the plugin name, this is used in seatunnel conf DSL.
+ *
+ * @return plugin name.
+ */
+ default String getPluginName() {
+ return this.getClass().getSimpleName();
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index 44c5e052..96aa1080 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -94,4 +94,10 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
public void close() {
}
+
+ @Override
+ public String getPluginName() {
+ return "ConsoleSink";
+ }
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
index d63d7600..9d20ed64 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
@@ -98,6 +98,11 @@ public class DorisSink implements FlinkStreamSink,
FlinkBatchSink {
PropertiesUtil.setProperties(config, streamLoadProp, producerPrefix,
false);
}
+ @Override
+ public String getPluginName() {
+ return "DorisSink";
+ }
+
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row>
dataSet) {
batchIntervalMs = 0;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
index 46c7e55e..0cb086e8 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
@@ -78,4 +78,9 @@ public class DruidSink implements FlinkBatchSink {
this.timestampFormat = config.hasPath(TIMESTAMP_FORMAT) ?
config.getString(TIMESTAMP_FORMAT) : null;
this.timestampMissingValue = config.hasPath(TIMESTAMP_MISSING_VALUE) ?
config.getString(TIMESTAMP_MISSING_VALUE) : null;
}
+
+ @Override
+ public String getPluginName() {
+ return "DruidSink";
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 24565a5b..26e9eda2 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -128,6 +128,11 @@ public class DruidSource implements FlinkBatchSource {
.finish();
}
+ @Override
+ public String getPluginName() {
+ return "DruidSource";
+ }
+
private RowTypeInfo getRowTypeInfo(String jdbcURL, String datasource,
Collection<String> userColumns) {
HashMap<String, TypeInformation> map = new LinkedHashMap<>();
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index 100ce9c8..529a0860 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -84,6 +84,11 @@ public class Elasticsearch implements FlinkStreamSink,
FlinkBatchSink {
config = config.withFallback(defaultConfig);
}
+ @Override
+ public String getPluginName() {
+ return "ElasticSearch";
+ }
+
@Override
public DataStreamSink<Row> outputStream(FlinkEnvironment env,
DataStream<Row> dataStream) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
index 328163fb..2501d1fe 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
@@ -52,6 +52,11 @@ public class FakeSource implements FlinkBatchSource {
return CheckResult.success();
}
+ @Override
+ public String getPluginName() {
+ return "FakeSource";
+ }
+
@Override
public DataSet<Row> getData(FlinkEnvironment env) {
Random random = new Random();
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
index 71bfa0f4..c62a4677 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
@@ -61,6 +61,11 @@ public class FakeSourceStream extends
RichParallelSourceFunction<Row> implements
return config;
}
+ @Override
+ public String getPluginName() {
+ return "FakeSourceStream";
+ }
+
private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky
Huo", "Kid Xiong"};
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index a206093d..8fb986b7 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -157,4 +157,9 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
outputFormat.close();
}
}
+
+ @Override
+ public String getPluginName() {
+ return "FileSink";
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index 4d0b2c7b..02a88440 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -118,4 +118,9 @@ public class FileSource implements FlinkBatchSource {
}
}
+
+ @Override
+ public String getPluginName() {
+ return "FileSource";
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
index 03621abc..b3df0085 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
@@ -86,4 +86,9 @@ public class InfluxDbSink implements FlinkBatchSink {
this.tags = config.getStringList(TAGS);
this.fields = config.getStringList(FIELDS);
}
+
+ @Override
+ public String getPluginName() {
+ return "InfluxDbSink";
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
index 8ea3e28b..73514c50 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
@@ -128,6 +128,11 @@ public class InfluxDbSource implements FlinkBatchSource {
.finish();
}
+ @Override
+ public String getPluginName() {
+ return "InfluxDbSource";
+ }
+
private RowTypeInfo getRowTypeInfo(List<String> fields, List<String>
fieldTypes) {
TypeInformation<?>[] typeInformation = new
TypeInformation<?>[fieldTypes.size()];
String[] names = new String[fieldTypes.size()];
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 3c3c5a8e..6829b28b 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -105,6 +105,11 @@ public class JdbcSink implements FlinkStreamSink,
FlinkBatchSink {
}
}
+ @Override
+ public String getPluginName() {
+ return "JdbcSink";
+ }
+
@Override
@Nullable
public DataStreamSink<Row> outputStream(FlinkEnvironment env,
DataStream<Row> dataStream) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 5d1a9e75..96fdc211 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -166,6 +166,11 @@ public class JdbcSource implements FlinkBatchSource {
}
}
+ @Override
+ public String getPluginName() {
+ return "JdbcSource";
+ }
+
private String extendPartitionQuerySql(String query, String column) {
Matcher matcher = COMPILE.matcher(query);
if (matcher.find()) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
index b8bf5c6f..b283e9d2 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
@@ -83,6 +83,11 @@ public class KafkaSink implements FlinkStreamSink {
kafkaParams.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
}
+ @Override
+ public String getPluginName() {
+ return "Kafka";
+ }
+
private FlinkKafkaProducer.Semantic getSemanticEnum(String semantic) {
if ("exactly_once".equals(semantic)) {
return FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 5c0f7bf7..1aacf290 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -111,6 +111,11 @@ public class KafkaTableStream implements FlinkStreamSource
{
schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);
}
+ @Override
+ public String getPluginName() {
+ return "KafkaTableStream";
+ }
+
@Override
public DataStream<Row> getData(FlinkEnvironment env) {
StreamTableEnvironment tableEnvironment =
env.getStreamTableEnvironment();
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
index 23993b32..35151430 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
@@ -71,4 +71,9 @@ public class SocketStream implements FlinkStreamSource {
port = config.getInt(PORT);
}
}
+
+ @Override
+ public String getPluginName() {
+ return "SocketStream";
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 12e5895a..f62714f2 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -316,6 +316,8 @@ class Clickhouse extends SparkBatchSink {
throw e
}
}
+
+ override def getPluginName: String = "Clickhouse"
}
object Clickhouse {
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 7deb0456..676fb5d0 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -285,6 +285,8 @@ class ClickhouseFile extends SparkBatchSink {
override def prepare(prepareEnv: SparkEnvironment): Unit = {
}
+
+ override def getPluginName: String = "ClickhouseFile"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
index 99a11fbd..50ae94d4 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
@@ -68,4 +68,6 @@ class Console extends SparkBatchSink {
))
config = config.withFallback(defaultConfig)
}
+
+ override def getPluginName: String = "Console"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
index 576ef0bd..f74df925 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
@@ -95,4 +95,11 @@ class Doris extends SparkBatchSink with Serializable {
batch_size = config.getInt(Config.BULK_SIZE)
}
}
+
+ /**
+ * Return the plugin name, this is used in seatunnel conf DSL.
+ *
+ * @return plugin name.
+ */
+ override def getPluginName: String = "Doris"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
index 30757331..a4724186 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
@@ -74,4 +74,10 @@ class Elasticsearch extends SparkBatchSink {
}
}
+ /**
+ * Return the plugin name, this is used in seatunnel conf DSL.
+ *
+ * @return plugin name.
+ */
+ override def getPluginName: String = "ElasticSearch"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
index 6160f2a5..495a3591 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
@@ -66,4 +66,10 @@ class Elasticsearch extends SparkBatchSource {
checkAllExists(config, HOSTS, INDEX)
}
+ /**
+ * Return the plugin name, this is used in seatunnel conf DSL.
+ *
+ * @return plugin name.
+ */
+ override def getPluginName: String = "ElasticSearch"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
index fed4e1c2..6c31647b 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
@@ -122,4 +122,11 @@ class Email extends SparkBatchSink {
val mailer: SMTPMailer = new SMTPMailer(configuration)
mailer
}
+
+ /**
+ * Return the plugin name, this is used in seatunnel conf DSL.
+ *
+ * @return plugin name.
+ */
+ override def getPluginName: String = "Email"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
index cc59c6a6..180ad17a 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
@@ -37,4 +37,5 @@ class Fake extends SparkBatchSource {
env.getSparkSession.createDataset(s)(RowEncoder(schema))
}
+ override def getPluginName: String = "Fake"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
index 66b0361a..443460a6 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
@@ -60,6 +60,8 @@ class FakeStream extends SparkStreamingSource[String] {
CheckResult.error("please make sure [content] is of type string array")
}
}
+
+ override def getPluginName: String = "FakeStream"
}
private class FakeReceiver(config: Config)
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
index 6142017b..37ca7afe 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
@@ -69,4 +69,6 @@ class File extends SparkBatchSink {
case ORC => writer.orc(path)
}
}
+
+ override def getPluginName: String = "File"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
index 373d7be3..04170098 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
@@ -63,4 +63,6 @@ class File extends SparkBatchSource {
case _ => reader.format(format).load(path)
}
}
+
+ override def getPluginName: String = "File"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index 3bdbb2d3..0e5c6d12 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -162,4 +162,6 @@ class Hbase extends SparkBatchSink with Logging {
admin.truncateTable(tableName, true)
}
}
+
+ override def getPluginName: String = "Hbase"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
index d96f7d4a..e2c7d342 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
@@ -49,4 +49,6 @@ class Hbase extends SparkBatchSource {
reader.load()
}
+
+ override def getPluginName: String = "Hbase"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
index 4b43ff52..d685bfd3 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
@@ -61,4 +61,6 @@ class Hive extends SparkBatchSink with Logging {
}
}
}
+
+ override def getPluginName: String = "Hive"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
index 2577b2e5..b4dfa8f8 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
@@ -31,4 +31,6 @@ class Hive extends SparkBatchSource {
override def getData(env: SparkEnvironment): Dataset[Row] = {
env.getSparkSession.sql(config.getString("pre_sql"))
}
+
+ override def getPluginName: String = "Hive"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
index b3878e5f..32cb6cf2 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
@@ -46,4 +46,6 @@ class Hudi extends SparkBatchSink {
writer.mode(config.getString("save_mode"))
.save(config.getString("hoodie.base.path"))
}
+
+ override def getPluginName: String = "Hudi"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
index cfc6cf3e..723077f8 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
@@ -39,4 +39,6 @@ class Hudi extends SparkBatchSource {
reader.load(config.getString("hoodie.datasource.read.paths"))
}
+
+ override def getPluginName: String = "Hudi"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
index 234eff47..ee42b36e 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
@@ -53,4 +53,6 @@ class Iceberg extends SparkBatchSink {
"saveMode" -> "append"))
config = config.withFallback(defaultConfig)
}
+
+ override def getPluginName: String = "Iceberg"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
index 81d10808..2bed3f11 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
@@ -47,4 +47,5 @@ class Iceberg extends SparkBatchSource {
checkAllExists(config, "path", "pre_sql")
}
+ override def getPluginName: String = "Iceberg"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
index 0df841a9..a32b7906 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
@@ -69,4 +69,6 @@ class Jdbc extends SparkBatchSink {
"duplicateIncs" -> ""))
config = config.withFallback(defaultConfig)
}
+
+ override def getPluginName: String = "Jdbc"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
index 37f8c8d1..6648e019 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
@@ -60,4 +60,6 @@ class Jdbc extends SparkBatchSource {
reader
}
+
+ override def getPluginName: String = "Jdbc"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
index 09f5eb01..f998150b 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
@@ -102,4 +102,6 @@ class Kafka extends SparkBatchSink with Logging {
}
}
}
+
+ override def getPluginName: String = "Kafka"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
index 0cab321d..eca881e7 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
@@ -124,4 +124,6 @@ class KafkaStream extends SparkStreamingSource[(String,
String)] {
}
}
}
+
+ override def getPluginName: String = "KafkaStream"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
index 900f134c..ee606a7e 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
@@ -53,4 +53,6 @@ class Kudu extends SparkBatchSink {
case "insertIgnore" => kuduContext.insertIgnoreRows(df, table)
}
}
+
+ override def getPluginName: String = "Kudu"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
index 7710b408..36cc6871 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
@@ -41,4 +41,5 @@ class Kudu extends SparkBatchSource {
ds
}
+ override def getPluginName: String = "Kudu"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
index 97ea6d42..940ff940 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
@@ -54,4 +54,5 @@ class MongoDB extends SparkBatchSink {
MongoSpark.save(df, writeConfig)
}
+ override def getPluginName: String = "MongoDB"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
index 5a42a804..e565d414 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
@@ -69,4 +69,5 @@ class MongoDB extends SparkBatchSource {
CheckConfigUtil.checkAllExists(config, "readconfig.uri",
"readconfig.database", "readconfig.collection")
}
+ override def getPluginName: String = "MongoDB"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
index 68e157e8..b781bf4f 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
@@ -53,4 +53,6 @@ class Neo4j extends SparkBatchSource {
neo4jConf.put(entry.getKey, config.getString(entry.getKey))
})
}
+
+ override def getPluginName: String = "Neo4j"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
index 63c4ec65..08920c67 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
@@ -76,4 +76,5 @@ class Phoenix extends SparkBatchSink with Logging {
}
}
+ override def getPluginName: String = "Phoenix"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
index 85c964b3..5b735d1c 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
@@ -74,4 +74,5 @@ class Phoenix extends SparkBatchSource with Logging {
}
}
+ override def getPluginName: String = "Phoenix"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
index 7fc75487..b82304be 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
@@ -108,4 +108,6 @@ class Redis extends SparkBatchSink with Logging {
val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
sc.toRedisHASH(value, hashName)(redisConfig = redisConfig)
}
+
+ override def getPluginName: String = "Redis"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
index faf59297..2cf37b9a 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
@@ -123,4 +123,5 @@ class Redis extends SparkBatchSource {
sc.fromRedisZSet(keysOrKeyPattern, partitionNum)(redisConfig = redisConfig)
}
+ override def getPluginName: String = "Redis"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
index d52088cf..daff716a 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
@@ -49,4 +49,5 @@ class SocketStream extends SparkStreamingSource[String] {
sparkSession.createDataFrame(rowsRDD, schema)
}
+ override def getPluginName: String = "SocketStream"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
index 18e74f34..64bfed39 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
@@ -60,4 +60,5 @@ class Tidb extends SparkBatchSink {
checkAllExists(config, "addr", "port", "database", "table", "user",
"password")
}
+ override def getPluginName: String = "TiDB"
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
index 4ec352af..99a6b684 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
@@ -36,4 +36,6 @@ class Tidb extends SparkBatchSource {
spark.sql("use " + config.getString("database"))
spark.sql(config.getString("pre_sql"))
}
+
+ override def getPluginName: String = "TiDB"
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
index adabccbd..fdeadc15 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
@@ -120,8 +120,7 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
try {
Plugin<?> plugin = it.next();
- Class<?> serviceClass = plugin.getClass();
- if (StringUtils.equalsIgnoreCase(serviceClass.getSimpleName(),
pluginName)) {
+ if (StringUtils.equalsIgnoreCase(plugin.getPluginName(),
pluginName)) {
return plugin;
}
} catch (ServiceConfigurationError e) {
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
index 22929673..c1c95727 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
@@ -101,4 +101,8 @@ public class Split implements FlinkStreamTransform,
FlinkBatchTransform {
rowTypeInfo = new RowTypeInfo(types, fields.toArray(new String[0]));
}
+ @Override
+ public String getPluginName() {
+ return "split";
+ }
}
diff --git
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
index 1847c5ae..b0f28b20 100644
---
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
+++
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
@@ -84,4 +84,9 @@ public class Sql implements FlinkStreamTransform,
FlinkBatchTransform {
public void prepare(FlinkEnvironment env) {
sql = config.getString("sql");
}
+
+ @Override
+ public String getPluginName() {
+ return "sql";
+ }
}
diff --git
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 1a2f649f..519be92b 100644
---
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -135,4 +135,6 @@ class Json extends BaseSparkTransform {
this.useCustomSchema = true
}
}
+
+ override def getPluginName: String = "json"
}
diff --git
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 6f88c925..498776c0 100644
---
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -85,4 +85,6 @@ class Split extends BaseSparkTransform {
}
filled.toSeq
}
+
+ override def getPluginName: String = "split"
}
diff --git
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 0bfa79f8..2881b1a2 100644
---
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -31,4 +31,6 @@ class Sql extends BaseSparkTransform {
checkAllExists(config, "sql")
}
+ override def getPluginName: String = "sql"
+
}