This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 51d9af9b Use @AutoService to generate SPI file (#1873)
51d9af9b is described below

commit 51d9af9b4708f73f7251f292c68d451576b45c85
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat May 14 14:04:09 2022 +0800

    Use @AutoService to generate SPI file (#1873)
---
 docs/en/contribution/contribute-plugin.md          | 36 ++++++++++++++--------
 pom.xml                                            | 20 ++++++++++++
 seatunnel-common/pom.xml                           |  9 ------
 .../flink/clickhouse/sink/ClickhouseBatchSink.java |  3 ++
 .../clickhouse/sink/ClickhouseFileBatchSink.java   |  8 +++--
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 19 ------------
 .../seatunnel/flink/console/sink/ConsoleSink.java  |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../seatunnel/flink/doris/sink/DorisSink.java      |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../seatunnel/flink/druid/sink/DruidSink.java      |  3 ++
 .../seatunnel/flink/druid/source/DruidSource.java  |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../flink/elasticsearch6/sink/Elasticsearch6.java  |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../flink/elasticsearch/sink/Elasticsearch.java    |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../seatunnel/flink/fake/source/FakeSource.java    |  3 ++
 .../flink/fake/source/FakeSourceStream.java        |  7 +++--
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 19 ------------
 .../apache/seatunnel/flink/file/sink/FileSink.java |  3 ++
 .../seatunnel/flink/file/source/FileSource.java    |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../apache/seatunnel/flink/http/source/Http.java   |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../flink/influxdb/sink/InfluxDbSink.java          |  3 ++
 .../flink/influxdb/source/InfluxDbSource.java      |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../apache/seatunnel/flink/jdbc/sink/JdbcSink.java |  3 ++
 .../seatunnel/flink/jdbc/source/JdbcSource.java    |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../seatunnel/flink/kafka/sink/KafkaSink.java      |  3 ++
 .../flink/kafka/source/KafkaTableStream.java       |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSink       | 18 -----------
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 .../flink/socket/source/SocketStream.java          |  3 ++
 .../org.apache.seatunnel.flink.BaseFlinkSource     | 18 -----------
 41 files changed, 109 insertions(+), 351 deletions(-)

diff --git a/docs/en/contribution/contribute-plugin.md 
b/docs/en/contribution/contribute-plugin.md
index 95a95906..ae58cb74 100644
--- a/docs/en/contribution/contribute-plugin.md
+++ b/docs/en/contribution/contribute-plugin.md
@@ -11,7 +11,8 @@ Once you want to contribute a new plugin, you need to:
 Create your plugin module under the corresponding parent plugin module.
 For example, if you want to add a new Spark connector plugin, you need to 
create a new module under the `seatunnel-connectors-spark` module.
 
-```java
+```xml
+
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -46,6 +47,7 @@ The execution order of the lifecycle methods is: 
`checkConfig` -> `prepare` -> `
 ```java
 import java.util.Date;
 
+@AutoService(BaseSparkSource.class)
 public class Hello extends SparkBatchSource {
     @Override
     public Dataset<Row> getData(SparkEnvironment env) {
@@ -75,16 +77,22 @@ public class Hello extends SparkBatchSource {
     }
 }
 ```
-The `getPluginName` method is used to identify the plugin name.
 
-After you finish your implementation, you need to add a service provider to 
the `META-INF/services` file.
-The file name should be `org.apache.seatunnel.spark.BaseSparkSource` or 
`org.apache.seatunnel.spark.BaseSparkSink`, dependents on the plugin type.
-The content of the file should be the fully qualified class name of your 
implementation.
+- The `getPluginName` method is used to identify the plugin name.
+- The `@AutoService` is used to generate the 
`META-INF/services/org.apache.seatunnel.BaseSparkSource` file
+  automatically.
+
+Since this process cannot work on scala, if you use slala to implement your 
plugin, you need to add a service provider
+to the `META-INF/services` file. The file name should be 
`org.apache.seatunnel.spark.BaseSparkSource`
+or `org.apache.seatunnel.spark.BaseSparkSink`, dependents on the plugin type. 
The content of the file should be the
+fully qualified class name of your implementation.
 
 ## Add plugin to the distribution
 
-You need to add your plugin to the `seatunnel-core-spark` module, then the 
plugin will in distribution.
-```java
+You need to add your plugin to the `seatunnel-connectors-spark-dist` 
module,then the plugin will in distribution.
+
+```xml
+
 <dependency>
     <groupId>org.apache.seatunnel</groupId>
     <artifactId>seatunnel-connector-spark-hello</artifactId>
@@ -92,17 +100,20 @@ You need to add your plugin to the `seatunnel-core-spark` 
module, then the plugi
 </dependency>
 ```
 
+After you using `mvn package` to make a distribution, you can find the plugin 
in your ${distribution}/connectors/spark.
+
 # Contribute Flink Plugins
 
-The steps to contribute a Flink plugin is similar to the steps to contribute a 
Spark plugin. 
-Different from Spark, you need to add your plugin in Flink plugin modules.
+The steps to contribute a Flink plugin is similar to the steps to contribute a 
Spark plugin. Different from Spark, you
+need to add your plugin in Flink plugin modules.
 
 # Add e2e tests for your plugin
 
-Once you add a new plugin, it is recommended to add e2e tests for it. We have 
a `seatunnel-e2e` module to help you to do this.
+Once you add a new plugin, it is recommended to add e2e tests for it. We have 
a `seatunnel-e2e` module to help you to do
+this.
 
-For example, if you want to add an e2e test for your flink connector, you can 
create a new test in `seatunnel-flink-e2e` module.
-And extend the FlinkContainer class in the test.
+For example, if you want to add an e2e test for your flink connector, you can 
create a new test in `seatunnel-flink-e2e`
+module. And extend the FlinkContainer class in the test.
 
 ```java
 public class HellpSourceIT extends FlinkContainer {
@@ -113,6 +124,7 @@ public class HellpSourceIT extends FlinkContainer {
         Assert.assertEquals(0, execResult.getExitCode());
         // do some other assertion here
     }
+}
 
 ```
 Once your class implements the `FlinkContainer` interface, it will auto create 
a Flink cluster in Docker, and you just need to
diff --git a/pom.xml b/pom.xml
index 199fb63c..ad36d244 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
         <elasticsearch>7</elasticsearch>
         <slf4j.version>1.7.25</slf4j.version>
         <guava.version>19.0</guava.version>
+        <auto-service.version>1.0.1</auto-service.version>
     </properties>
 
     <dependencyManagement>
@@ -606,6 +607,25 @@
         </dependencies>
     </dependencyManagement>
 
+    <dependencies>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>${auto-service.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
     <build>
 
         
<finalName>${project.artifactId}-${project.version}-${scala.version}</finalName>
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index cd8db778..da0b169d 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -38,11 +38,6 @@
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-            <scope>provided</scope>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -53,10 +48,6 @@
             <artifactId>guava</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 620e921e..8966cc4b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -32,6 +32,7 @@ import static 
org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
@@ -41,6 +42,7 @@ import 
org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.types.Row;
@@ -55,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 
 @SuppressWarnings("magicnumber")
+@AutoService(BaseFlinkSink.class)
 public class ClickhouseBatchSink implements FlinkBatchSink {
 
     private ShardMetadata shardMetadata;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
index 573f3ec8..bbe98588 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
@@ -30,7 +30,9 @@ import static 
org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
 import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
 import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
@@ -40,6 +42,7 @@ import 
org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -61,7 +64,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class ClickhouseFileBatchSink extends ClickhouseBatchSink {
+@AutoService(BaseFlinkSink.class)
+public class ClickhouseFileBatchSink implements FlinkBatchSink {
 
     private Config config;
     private ShardMetadata shardMetadata;
@@ -178,7 +182,7 @@ public class ClickhouseFileBatchSink extends 
ClickhouseBatchSink {
 
     @Override
     public void close() {
-        super.close();
+        // do nothing
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index acd3d6a5..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.clickhouse.sink.ClickhouseBatchSink
-org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index a7394c12..bf178aa3 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -18,12 +18,14 @@
 package org.apache.seatunnel.flink.console.sink;
 
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@AutoService(BaseFlinkSink.class)
 public class ConsoleSink extends RichOutputFormat<Row> implements 
FlinkBatchSink, FlinkStreamSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsoleSink.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 8d3ae3fc..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.console.sink.ConsoleSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
index 32a078a1..07cfa6a0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.flink.doris.sink;
 import org.apache.seatunnel.common.PropertiesUtil;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -39,6 +41,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+@AutoService(BaseFlinkSink.class)
 public class DorisSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 4747849769146047770L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 6c5fe8a9..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.doris.sink.DorisSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
index 3dce7620..628ed549 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
@@ -19,15 +19,18 @@ package org.apache.seatunnel.flink.druid.sink;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.types.Row;
 
+@AutoService(BaseFlinkSink.class)
 public class DruidSink implements FlinkBatchSink {
 
     private static final long serialVersionUID = -2967782261362988646L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 26e9eda2..83478f57 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -29,11 +29,13 @@ import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INF
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -52,6 +54,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+@AutoService(BaseFlinkSource.class)
 public class DruidSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = 8152628883440481281L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 1e1c9c1b..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.druid.sink.DruidSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 8d000cb7..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.druid.source.DruidSource
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
index bdc1bd7d..d97075cb 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
@@ -29,6 +29,7 @@ import static 
org.apache.seatunnel.flink.elasticsearch6.config.Config.PARALLELIS
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.utils.StringTemplate;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -36,6 +37,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -52,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(BaseFlinkSink.class)
 public class Elasticsearch6 implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 8445868321245456793L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 3e5fe44d..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.elasticsearch6.sink.Elasticsearch6
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index c31427f9..a8e55b72 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -27,6 +27,7 @@ import static 
org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.utils.StringTemplate;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -50,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(BaseFlinkSink.class)
 public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
 
     private static final long serialVersionUID = 8445868321245456793L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index e6d1cf22..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.elasticsearch.sink.Elasticsearch
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
index 2501d1fe..311b1bc7 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.flink.fake.source;
 
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
@@ -31,6 +33,7 @@ import java.util.Arrays;
 import java.util.Random;
 import java.util.stream.Collectors;
 
+@AutoService(BaseFlinkSource.class)
 public class FakeSource implements FlinkBatchSource {
 
     private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky 
Huo", "Kid Xiong"};
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
index c62a4677..2a3995d2 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
@@ -20,20 +20,23 @@ package org.apache.seatunnel.flink.fake.source;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Row;
 
 import java.util.concurrent.TimeUnit;
 
+@AutoService(BaseFlinkSource.class)
 public class FakeSourceStream extends RichParallelSourceFunction<Row> 
implements FlinkStreamSource {
 
     private static final long serialVersionUID = -3026082767246767679L;
@@ -69,7 +72,7 @@ public class FakeSourceStream extends 
RichParallelSourceFunction<Row> implements
     private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky 
Huo", "Kid Xiong"};
 
     @Override
-    public void run(SourceContext<Row> ctx) throws Exception {
+    public void run(SourceFunction.SourceContext<Row> ctx) throws Exception {
         while (running) {
             int randomNum = (int) (1 + Math.random() * NAME_ARRAY.length);
             Row row = Row.of(NAME_ARRAY[randomNum - 1], 
System.currentTimeMillis());
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 948e5ea6..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.fake.source.FakeSource
-org.apache.seatunnel.flink.fake.source.FakeSourceStream
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index 2735143b..b8d3400c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.enums.FormatType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.DataSet;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
+@AutoService(BaseFlinkSink.class)
 public class FileSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSink.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index 02a88440..eafae0ae 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.file.source;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.flink.enums.FormatType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.google.auto.service.AutoService;
 import org.apache.avro.Schema;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -45,6 +47,7 @@ import org.apache.parquet.schema.MessageType;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(BaseFlinkSource.class)
 public class FileSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = -5206798549756998426L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 523a2304..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.file.sink.FileSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 2bf4b8a6..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.file.source.FileSource
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
index fc4d6068..c0dce094 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.flink.http.source;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.flink.http.source.constant.Settings;
@@ -29,6 +30,7 @@ import 
org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
@@ -43,6 +45,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(BaseFlinkSource.class)
 public class Http implements FlinkBatchSource {
 
     private static final String GET = "GET";
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index c884fff7..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.http.source.Http
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
index b983eccd..da27266b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
@@ -19,17 +19,20 @@ package org.apache.seatunnel.flink.influxdb.sink;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.types.Row;
 
 import java.util.List;
 
+@AutoService(BaseFlinkSink.class)
 public class InfluxDbSink implements FlinkBatchSink {
 
     private static final long serialVersionUID = 7358988750295693096L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
index 73514c50..7368193b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
@@ -26,11 +26,13 @@ import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INF
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,6 +44,7 @@ import org.apache.flink.types.Row;
 import java.util.HashMap;
 import java.util.List;
 
+@AutoService(BaseFlinkSource.class)
 public class InfluxDbSource implements FlinkBatchSource {
 
     private Config config;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 4e4d84c2..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.influxdb.sink.InfluxDbSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index ed8d57d5..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.influxdb.source.InfluxDbSource
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 786efb75..1ccb3cb9 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -31,12 +31,14 @@ import static 
org.apache.seatunnel.flink.jdbc.Config.USERNAME;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -58,6 +60,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 
+@AutoService(BaseFlinkSink.class)
 public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcSink.class);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 0eee8c08..e7433f79 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -35,6 +35,7 @@ import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.flink.jdbc.input.DefaultTypeInformationMap;
@@ -46,6 +47,7 @@ import 
org.apache.seatunnel.flink.jdbc.input.TypeInformationMap;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -67,6 +69,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+@AutoService(BaseFlinkSource.class)
 public class JdbcSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = -3349505356339446415L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 6611df9e..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.jdbc.sink.JdbcSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 614b84b4..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.jdbc.source.JdbcSource
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
index 2c1c6a43..40bc5262 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.flink.kafka.sink;
 import org.apache.seatunnel.common.PropertiesUtil;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -32,6 +34,7 @@ import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
+@AutoService(BaseFlinkSink.class)
 public class KafkaSink implements FlinkStreamSink {
     private static final long serialVersionUID = 3980751499724935230L;
     private static final String DEFAULT_KAFKA_SEMANTIC = "at_least_once";
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 1aacf290..69951f9f 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.PropertiesUtil;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.enums.FormatType;
 import org.apache.seatunnel.flink.stream.FlinkStreamSource;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.parser.Feature;
+import com.google.auto.service.AutoService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Properties;
 
+@AutoService(BaseFlinkSource.class)
 public class KafkaTableStream implements FlinkStreamSource {
 
     private static final long   serialVersionUID = 5287018194573371428L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 81122604..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.kafka.sink.KafkaSink
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index dc101ab0..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.kafka.source.KafkaTableStream
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
index 35151430..4e9f96a0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
@@ -17,11 +17,13 @@
 
 package org.apache.seatunnel.flink.socket.source;
 
+import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.scala.typeutils.Types;
@@ -29,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 
+@AutoService(BaseFlinkSource.class)
 public class SocketStream implements FlinkStreamSource {
 
     private static final long serialVersionUID = 986629276153771291L;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index b9099c0e..00000000
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.socket.source.SocketStream

Reply via email to