This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 705a2899 [Bug] [core] Cannot load plugin due to plugin name is not 
class name (#1664)
705a2899 is described below

commit 705a289946c55b35451e12cb0231c44f71120841
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Apr 7 14:46:20 2022 +0800

    [Bug] [core] Cannot load plugin due to plugin name is not class name (#1664)
    
    * [Bug] [core] Cannot load plugin due to plugin name is not class name
    
    * Fix flink plugin name
    
    * fix spark plugin name
    
    * Change plugin name to uppercase
---
 .../src/main/java/org/apache/seatunnel/plugin/Plugin.java         | 8 ++++++++
 .../java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java | 6 ++++++
 .../java/org/apache/seatunnel/flink/doris/sink/DorisSink.java     | 5 +++++
 .../java/org/apache/seatunnel/flink/druid/sink/DruidSink.java     | 5 +++++
 .../java/org/apache/seatunnel/flink/druid/source/DruidSource.java | 5 +++++
 .../apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java  | 5 +++++
 .../java/org/apache/seatunnel/flink/fake/source/FakeSource.java   | 5 +++++
 .../org/apache/seatunnel/flink/fake/source/FakeSourceStream.java  | 5 +++++
 .../main/java/org/apache/seatunnel/flink/file/sink/FileSink.java  | 5 +++++
 .../java/org/apache/seatunnel/flink/file/source/FileSource.java   | 5 +++++
 .../org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java    | 5 +++++
 .../apache/seatunnel/flink/influxdb/source/InfluxDbSource.java    | 5 +++++
 .../main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java  | 5 +++++
 .../java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java   | 5 +++++
 .../java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java     | 5 +++++
 .../org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java | 5 +++++
 .../org/apache/seatunnel/flink/socket/source/SocketStream.java    | 5 +++++
 .../org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala   | 2 ++
 .../apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala   | 2 ++
 .../scala/org/apache/seatunnel/spark/console/sink/Console.scala   | 2 ++
 .../main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala  | 7 +++++++
 .../apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala | 6 ++++++
 .../seatunnel/spark/elasticsearch/source/Elasticsearch.scala      | 6 ++++++
 .../main/scala/org/apache/seatunnel/spark/email/sink/Email.scala  | 7 +++++++
 .../main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala  | 1 +
 .../scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala | 2 ++
 .../main/scala/org/apache/seatunnel/spark/file/sink/File.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/file/source/File.scala  | 2 ++
 .../main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala  | 2 ++
 .../scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala     | 2 ++
 .../main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala  | 2 ++
 .../main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala  | 2 ++
 .../scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala   | 2 ++
 .../scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala | 1 +
 .../main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala  | 2 ++
 .../main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala  | 2 ++
 .../org/apache/seatunnel/spark/kafka/source/KafkaStream.scala     | 2 ++
 .../main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala  | 1 +
 .../scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala   | 1 +
 .../scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala | 1 +
 .../scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala     | 2 ++
 .../scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala   | 1 +
 .../scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala | 1 +
 .../main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala  | 2 ++
 .../scala/org/apache/seatunnel/spark/redis/source/Redis.scala     | 1 +
 .../org/apache/seatunnel/spark/socket/source/SocketStream.scala   | 1 +
 .../main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala    | 1 +
 .../main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala  | 2 ++
 .../src/main/java/org/apache/seatunnel/config/PluginFactory.java  | 3 +--
 .../src/main/java/org/apache/seatunnel/flink/transform/Split.java | 4 ++++
 .../src/main/java/org/apache/seatunnel/flink/transform/Sql.java   | 5 +++++
 .../main/scala/org/apache/seatunnel/spark/transform/Json.scala    | 2 ++
 .../main/scala/org/apache/seatunnel/spark/transform/Split.scala   | 2 ++
 .../src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala | 2 ++
 58 files changed, 183 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 35237b6d..6dd2ac44 100644
--- 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -71,4 +71,12 @@ public interface Plugin<T extends RuntimeEnv> extends 
Serializable, AutoCloseabl
 
     }
 
+    /**
+     * Return the plugin name, this is used in seatunnel conf DSL.
+     *
+     * @return plugin name.
+     */
+    default String getPluginName() {
+        return this.getClass().getSimpleName();
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index 44c5e052..96aa1080 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -94,4 +94,10 @@ public class ConsoleSink extends RichOutputFormat<Row> 
implements FlinkBatchSink
     public void close() {
 
     }
+
+    @Override
+    public String getPluginName() {
+        return "ConsoleSink";
+    }
+
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
index d63d7600..9d20ed64 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
@@ -98,6 +98,11 @@ public class DorisSink implements FlinkStreamSink, 
FlinkBatchSink {
         PropertiesUtil.setProperties(config, streamLoadProp, producerPrefix, 
false);
     }
 
+    @Override
+    public String getPluginName() {
+        return "DorisSink";
+    }
+
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> 
dataSet) {
         batchIntervalMs = 0;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
index 46c7e55e..0cb086e8 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
@@ -78,4 +78,9 @@ public class DruidSink implements FlinkBatchSink {
         this.timestampFormat = config.hasPath(TIMESTAMP_FORMAT) ? 
config.getString(TIMESTAMP_FORMAT) : null;
         this.timestampMissingValue = config.hasPath(TIMESTAMP_MISSING_VALUE) ? 
config.getString(TIMESTAMP_MISSING_VALUE) : null;
     }
+
+    @Override
+    public String getPluginName() {
+        return "DruidSink";
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 24565a5b..26e9eda2 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -128,6 +128,11 @@ public class DruidSource implements FlinkBatchSource {
                 .finish();
     }
 
+    @Override
+    public String getPluginName() {
+        return "DruidSource";
+    }
+
     private RowTypeInfo getRowTypeInfo(String jdbcURL, String datasource, 
Collection<String> userColumns) {
         HashMap<String, TypeInformation> map = new LinkedHashMap<>();
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index 100ce9c8..529a0860 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -84,6 +84,11 @@ public class Elasticsearch implements FlinkStreamSink, 
FlinkBatchSink {
         config = config.withFallback(defaultConfig);
     }
 
+    @Override
+    public String getPluginName() {
+        return "ElasticSearch";
+    }
+
     @Override
     public DataStreamSink<Row> outputStream(FlinkEnvironment env, 
DataStream<Row> dataStream) {
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
index 328163fb..2501d1fe 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
@@ -52,6 +52,11 @@ public class FakeSource implements FlinkBatchSource {
         return CheckResult.success();
     }
 
+    @Override
+    public String getPluginName() {
+        return "FakeSource";
+    }
+
     @Override
     public DataSet<Row> getData(FlinkEnvironment env) {
         Random random = new Random();
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
index 71bfa0f4..c62a4677 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
@@ -61,6 +61,11 @@ public class FakeSourceStream extends 
RichParallelSourceFunction<Row> implements
         return config;
     }
 
+    @Override
+    public String getPluginName() {
+        return "FakeSourceStream";
+    }
+
     private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky 
Huo", "Kid Xiong"};
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index a206093d..8fb986b7 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -157,4 +157,9 @@ public class FileSink implements FlinkStreamSink, 
FlinkBatchSink {
             outputFormat.close();
         }
     }
+
+    @Override
+    public String getPluginName() {
+        return "FileSink";
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index 4d0b2c7b..02a88440 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -118,4 +118,9 @@ public class FileSource implements FlinkBatchSource {
         }
 
     }
+
+    @Override
+    public String getPluginName() {
+        return "FileSource";
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
index 03621abc..b3df0085 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
@@ -86,4 +86,9 @@ public class InfluxDbSink implements FlinkBatchSink {
         this.tags = config.getStringList(TAGS);
         this.fields = config.getStringList(FIELDS);
     }
+
+    @Override
+    public String getPluginName() {
+        return "InfluxDbSink";
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
index 8ea3e28b..73514c50 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
@@ -128,6 +128,11 @@ public class InfluxDbSource implements FlinkBatchSource {
                 .finish();
     }
 
+    @Override
+    public String getPluginName() {
+        return "InfluxDbSource";
+    }
+
     private RowTypeInfo getRowTypeInfo(List<String> fields, List<String> 
fieldTypes) {
         TypeInformation<?>[] typeInformation = new 
TypeInformation<?>[fieldTypes.size()];
         String[] names = new String[fieldTypes.size()];
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 3c3c5a8e..6829b28b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -105,6 +105,11 @@ public class JdbcSink implements FlinkStreamSink, 
FlinkBatchSink {
         }
     }
 
+    @Override
+    public String getPluginName() {
+        return "JdbcSink";
+    }
+
     @Override
     @Nullable
     public DataStreamSink<Row> outputStream(FlinkEnvironment env, 
DataStream<Row> dataStream) {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 5d1a9e75..96fdc211 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -166,6 +166,11 @@ public class JdbcSource implements FlinkBatchSource {
         }
     }
 
+    @Override
+    public String getPluginName() {
+        return "JdbcSource";
+    }
+
     private String extendPartitionQuerySql(String query, String column) {
         Matcher matcher = COMPILE.matcher(query);
         if (matcher.find()) {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
index b8bf5c6f..b283e9d2 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
@@ -83,6 +83,11 @@ public class KafkaSink implements FlinkStreamSink {
         kafkaParams.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
     }
 
+    @Override
+    public String getPluginName() {
+        return "Kafka";
+    }
+
     private FlinkKafkaProducer.Semantic getSemanticEnum(String semantic) {
         if ("exactly_once".equals(semantic)) {
             return FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 5c0f7bf7..1aacf290 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -111,6 +111,11 @@ public class KafkaTableStream implements FlinkStreamSource 
{
         schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);
     }
 
+    @Override
+    public String getPluginName() {
+        return "KafkaTableStream";
+    }
+
     @Override
     public DataStream<Row> getData(FlinkEnvironment env) {
         StreamTableEnvironment tableEnvironment = 
env.getStreamTableEnvironment();
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
index 23993b32..35151430 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
@@ -71,4 +71,9 @@ public class SocketStream implements FlinkStreamSource {
             port = config.getInt(PORT);
         }
     }
+
+    @Override
+    public String getPluginName() {
+        return "SocketStream";
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 12e5895a..f62714f2 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -316,6 +316,8 @@ class Clickhouse extends SparkBatchSink {
         throw e
     }
   }
+
+  override def getPluginName: String = "Clickhouse"
 }
 
 object Clickhouse {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 7deb0456..676fb5d0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -285,6 +285,8 @@ class ClickhouseFile extends SparkBatchSink {
 
   override def prepare(prepareEnv: SparkEnvironment): Unit = {
   }
+
+  override def getPluginName: String = "ClickhouseFile"
 }
 
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
index 99a11fbd..50ae94d4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-console/src/main/scala/org/apache/seatunnel/spark/console/sink/Console.scala
@@ -68,4 +68,6 @@ class Console extends SparkBatchSink {
       ))
     config = config.withFallback(defaultConfig)
   }
+
+  override def getPluginName: String = "Console"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
index 576ef0bd..f74df925 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-doris/src/main/scala/org/apache/seatunnel/spark/doris/sink/Doris.scala
@@ -95,4 +95,11 @@ class Doris extends SparkBatchSink with Serializable {
       batch_size = config.getInt(Config.BULK_SIZE)
     }
   }
+
+  /**
+   * Return the plugin name, this is used in seatunnel conf DSL.
+   *
+   * @return plugin name.
+   */
+  override def getPluginName: String = "Doris"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
index 30757331..a4724186 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/sink/Elasticsearch.scala
@@ -74,4 +74,10 @@ class Elasticsearch extends SparkBatchSink {
     }
   }
 
+  /**
+   * Return the plugin name, this is used in seatunnel conf DSL.
+   *
+   * @return plugin name.
+   */
+  override def getPluginName: String = "ElasticSearch"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
index 6160f2a5..495a3591 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-elasticsearch/src/main/scala/org/apache/seatunnel/spark/elasticsearch/source/Elasticsearch.scala
@@ -66,4 +66,10 @@ class Elasticsearch extends SparkBatchSource {
     checkAllExists(config, HOSTS, INDEX)
   }
 
+  /**
+   * Return the plugin name, this is used in seatunnel conf DSL.
+   *
+   * @return plugin name.
+   */
+  override def getPluginName: String = "ElasticSearch"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
index fed4e1c2..6c31647b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-email/src/main/scala/org/apache/seatunnel/spark/email/sink/Email.scala
@@ -122,4 +122,11 @@ class Email extends SparkBatchSink {
     val mailer: SMTPMailer = new SMTPMailer(configuration)
     mailer
   }
+
+  /**
+   * Return the plugin name, this is used in seatunnel conf DSL.
+   *
+   * @return plugin name.
+   */
+  override def getPluginName: String = "Email"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
index cc59c6a6..180ad17a 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/Fake.scala
@@ -37,4 +37,5 @@ class Fake extends SparkBatchSource {
     env.getSparkSession.createDataset(s)(RowEncoder(schema))
   }
 
+  override def getPluginName: String = "Fake"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
index 66b0361a..443460a6 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-fake/src/main/scala/org/apache/seatunnel/spark/fake/source/FakeStream.scala
@@ -60,6 +60,8 @@ class FakeStream extends SparkStreamingSource[String] {
       CheckResult.error("please make sure [content] is of type string array")
     }
   }
+
+  override def getPluginName: String = "FakeStream"
 }
 
 private class FakeReceiver(config: Config)
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
index 6142017b..37ca7afe 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/sink/File.scala
@@ -69,4 +69,6 @@ class File extends SparkBatchSink {
       case ORC => writer.orc(path)
     }
   }
+
+  override def getPluginName: String = "File"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
index 373d7be3..04170098 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-file/src/main/scala/org/apache/seatunnel/spark/file/source/File.scala
@@ -63,4 +63,6 @@ class File extends SparkBatchSource {
       case _ => reader.format(format).load(path)
     }
   }
+
+  override def getPluginName: String = "File"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index 3bdbb2d3..0e5c6d12 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -162,4 +162,6 @@ class Hbase extends SparkBatchSink with Logging {
       admin.truncateTable(tableName, true)
     }
   }
+
+  override def getPluginName: String = "Hbase"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
index d96f7d4a..e2c7d342 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/source/Hbase.scala
@@ -49,4 +49,6 @@ class Hbase extends SparkBatchSource {
 
     reader.load()
   }
+
+  override def getPluginName: String = "Hbase"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
index 4b43ff52..d685bfd3 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/sink/Hive.scala
@@ -61,4 +61,6 @@ class Hive extends SparkBatchSink with Logging {
       }
     }
   }
+
+  override def getPluginName: String = "Hive"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
index 2577b2e5..b4dfa8f8 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hive/src/main/scala/org/apache/seatunnel/spark/hive/source/Hive.scala
@@ -31,4 +31,6 @@ class Hive extends SparkBatchSource {
   override def getData(env: SparkEnvironment): Dataset[Row] = {
     env.getSparkSession.sql(config.getString("pre_sql"))
   }
+
+  override def getPluginName: String = "Hive"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
index b3878e5f..32cb6cf2 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/sink/Hudi.scala
@@ -46,4 +46,6 @@ class Hudi extends SparkBatchSink {
     writer.mode(config.getString("save_mode"))
       .save(config.getString("hoodie.base.path"))
   }
+
+  override def getPluginName: String = "Hudi"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
index cfc6cf3e..723077f8 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hudi/src/main/scala/org/apache/seatunnel/spark/hudi/source/Hudi.scala
@@ -39,4 +39,6 @@ class Hudi extends SparkBatchSource {
 
     reader.load(config.getString("hoodie.datasource.read.paths"))
   }
+
+  override def getPluginName: String = "Hudi"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
index 234eff47..ee42b36e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
@@ -53,4 +53,6 @@ class Iceberg extends SparkBatchSink {
         "saveMode" -> "append"))
     config = config.withFallback(defaultConfig)
   }
+
+  override def getPluginName: String = "Iceberg"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
index 81d10808..2bed3f11 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
@@ -47,4 +47,5 @@ class Iceberg extends SparkBatchSource {
     checkAllExists(config, "path", "pre_sql")
   }
 
+  override def getPluginName: String = "Iceberg"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
index 0df841a9..a32b7906 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/sink/Jdbc.scala
@@ -69,4 +69,6 @@ class Jdbc extends SparkBatchSink {
         "duplicateIncs" -> ""))
     config = config.withFallback(defaultConfig)
   }
+
+  override def getPluginName: String = "Jdbc"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
index 37f8c8d1..6648e019 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
@@ -60,4 +60,6 @@ class Jdbc extends SparkBatchSource {
 
     reader
   }
+
+  override def getPluginName: String = "Jdbc"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
index 09f5eb01..f998150b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/sink/Kafka.scala
@@ -102,4 +102,6 @@ class Kafka extends SparkBatchSink with Logging {
         }
     }
   }
+
+  override def getPluginName: String = "Kafka"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
index 0cab321d..eca881e7 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kafka/src/main/scala/org/apache/seatunnel/spark/kafka/source/KafkaStream.scala
@@ -124,4 +124,6 @@ class KafkaStream extends SparkStreamingSource[(String, 
String)] {
       }
     }
   }
+
+  override def getPluginName: String = "KafkaStream"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
index 900f134c..ee606a7e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/sink/Kudu.scala
@@ -53,4 +53,6 @@ class Kudu extends SparkBatchSink {
       case "insertIgnore" => kuduContext.insertIgnoreRows(df, table)
     }
   }
+
+  override def getPluginName: String = "Kudu"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
index 7710b408..36cc6871 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-kudu/src/main/scala/org/apache/seatunnel/spark/kudu/source/Kudu.scala
@@ -41,4 +41,5 @@ class Kudu extends SparkBatchSource {
     ds
   }
 
+  override def getPluginName: String = "Kudu"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
index 97ea6d42..940ff940 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/sink/MongoDB.scala
@@ -54,4 +54,5 @@ class MongoDB extends SparkBatchSink {
     MongoSpark.save(df, writeConfig)
   }
 
+  override def getPluginName: String = "MongoDB"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
index 5a42a804..e565d414 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
@@ -69,4 +69,5 @@ class MongoDB extends SparkBatchSource {
     CheckConfigUtil.checkAllExists(config, "readconfig.uri", 
"readconfig.database", "readconfig.collection")
   }
 
+  override def getPluginName: String = "MongoDB"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
index 68e157e8..b781bf4f 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-neo4j/src/main/scala/org/apache/seatunnel/spark/neo4j/source/Neo4j.scala
@@ -53,4 +53,6 @@ class Neo4j extends SparkBatchSource {
       neo4jConf.put(entry.getKey, config.getString(entry.getKey))
     })
   }
+
+  override def getPluginName: String = "Neo4j"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
index 63c4ec65..08920c67 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/sink/Phoenix.scala
@@ -76,4 +76,5 @@ class Phoenix extends SparkBatchSink with Logging {
     }
   }
 
+  override def getPluginName: String = "Phoenix"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
index 85c964b3..5b735d1c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-phoenix/src/main/scala/org/apache/seatunnel/spark/phoenix/source/Phoenix.scala
@@ -74,4 +74,5 @@ class Phoenix extends SparkBatchSource with Logging {
     }
   }
 
+  override def getPluginName: String = "Phoenix"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
index 7fc75487..b82304be 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/sink/Redis.scala
@@ -108,4 +108,6 @@ class Redis extends SparkBatchSink with Logging {
     val value = data.rdd.map(x => (x.getString(0), x.getString(1)))
     sc.toRedisHASH(value, hashName)(redisConfig = redisConfig)
   }
+
+  override def getPluginName: String = "Redis"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
index faf59297..2cf37b9a 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-redis/src/main/scala/org/apache/seatunnel/spark/redis/source/Redis.scala
@@ -123,4 +123,5 @@ class Redis extends SparkBatchSource {
     sc.fromRedisZSet(keysOrKeyPattern, partitionNum)(redisConfig = redisConfig)
   }
 
+  override def getPluginName: String = "Redis"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
index d52088cf..daff716a 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-socket/src/main/scala/org/apache/seatunnel/spark/socket/source/SocketStream.scala
@@ -49,4 +49,5 @@ class SocketStream extends SparkStreamingSource[String] {
     sparkSession.createDataFrame(rowsRDD, schema)
   }
 
+  override def getPluginName: String = "SocketStream"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
index 18e74f34..64bfed39 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/sink/Tidb.scala
@@ -60,4 +60,5 @@ class Tidb extends SparkBatchSink {
     checkAllExists(config, "addr", "port", "database", "table", "user", 
"password")
   }
 
+  override def getPluginName: String = "TiDB"
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
index 4ec352af..99a6b684 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-tidb/src/main/scala/org/apache/seatunnel/spark/tidb/source/Tidb.scala
@@ -36,4 +36,6 @@ class Tidb extends SparkBatchSource {
     spark.sql("use " + config.getString("database"))
     spark.sql(config.getString("pre_sql"))
   }
+
+  override def getPluginName: String = "TiDB"
 }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
index adabccbd..fdeadc15 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
@@ -120,8 +120,7 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
         for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
             try {
                 Plugin<?> plugin = it.next();
-                Class<?> serviceClass = plugin.getClass();
-                if (StringUtils.equalsIgnoreCase(serviceClass.getSimpleName(), 
pluginName)) {
+                if (StringUtils.equalsIgnoreCase(plugin.getPluginName(), 
pluginName)) {
                     return plugin;
                 }
             } catch (ServiceConfigurationError e) {
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
index 22929673..c1c95727 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-split/src/main/java/org/apache/seatunnel/flink/transform/Split.java
@@ -101,4 +101,8 @@ public class Split implements FlinkStreamTransform, 
FlinkBatchTransform {
         rowTypeInfo = new RowTypeInfo(types, fields.toArray(new String[0]));
     }
 
+    @Override
+    public String getPluginName() {
+        return "split";
+    }
 }
diff --git 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
index 1847c5ae..b0f28b20 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
+++ 
b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-sql/src/main/java/org/apache/seatunnel/flink/transform/Sql.java
@@ -84,4 +84,9 @@ public class Sql implements FlinkStreamTransform, 
FlinkBatchTransform {
     public void prepare(FlinkEnvironment env) {
         sql = config.getString("sql");
     }
+
+    @Override
+    public String getPluginName() {
+        return "sql";
+    }
 }
diff --git 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 1a2f649f..519be92b 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++ 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -135,4 +135,6 @@ class Json extends BaseSparkTransform {
       this.useCustomSchema = true
     }
   }
+
+  override def getPluginName: String = "json"
 }
diff --git 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 6f88c925..498776c0 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++ 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -85,4 +85,6 @@ class Split extends BaseSparkTransform {
     }
     filled.toSeq
   }
+
+  override def getPluginName: String = "split"
 }
diff --git 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 0bfa79f8..2881b1a2 100644
--- 
a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ 
b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -31,4 +31,6 @@ class Sql extends BaseSparkTransform {
     checkAllExists(config, "sql")
   }
 
+  override def getPluginName: String = "sql"
+
 }

Reply via email to