This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 51d9af9b Use @AutoService to generate SPI file (#1873)
51d9af9b is described below
commit 51d9af9b4708f73f7251f292c68d451576b45c85
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat May 14 14:04:09 2022 +0800
Use @AutoService to generate SPI file (#1873)
---
docs/en/contribution/contribute-plugin.md | 36 ++++++++++++++--------
pom.xml | 20 ++++++++++++
seatunnel-common/pom.xml | 9 ------
.../flink/clickhouse/sink/ClickhouseBatchSink.java | 3 ++
.../clickhouse/sink/ClickhouseFileBatchSink.java | 8 +++--
.../org.apache.seatunnel.flink.BaseFlinkSink | 19 ------------
.../seatunnel/flink/console/sink/ConsoleSink.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../seatunnel/flink/doris/sink/DorisSink.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../seatunnel/flink/druid/sink/DruidSink.java | 3 ++
.../seatunnel/flink/druid/source/DruidSource.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../flink/elasticsearch6/sink/Elasticsearch6.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../flink/elasticsearch/sink/Elasticsearch.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../seatunnel/flink/fake/source/FakeSource.java | 3 ++
.../flink/fake/source/FakeSourceStream.java | 7 +++--
.../org.apache.seatunnel.flink.BaseFlinkSource | 19 ------------
.../apache/seatunnel/flink/file/sink/FileSink.java | 3 ++
.../seatunnel/flink/file/source/FileSource.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../apache/seatunnel/flink/http/source/Http.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../flink/influxdb/sink/InfluxDbSink.java | 3 ++
.../flink/influxdb/source/InfluxDbSource.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 3 ++
.../seatunnel/flink/jdbc/source/JdbcSource.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../seatunnel/flink/kafka/sink/KafkaSink.java | 3 ++
.../flink/kafka/source/KafkaTableStream.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSink | 18 -----------
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
.../flink/socket/source/SocketStream.java | 3 ++
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 -----------
41 files changed, 109 insertions(+), 351 deletions(-)
diff --git a/docs/en/contribution/contribute-plugin.md
b/docs/en/contribution/contribute-plugin.md
index 95a95906..ae58cb74 100644
--- a/docs/en/contribution/contribute-plugin.md
+++ b/docs/en/contribution/contribute-plugin.md
@@ -11,7 +11,8 @@ Once you want to contribute a new plugin, you need to:
Create your plugin module under the corresponding parent plugin module.
For example, if you want to add a new Spark connector plugin, you need to
create a new module under the `seatunnel-connectors-spark` module.
-```java
+```xml
+
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -46,6 +47,7 @@ The execution order of the lifecycle methods is:
`checkConfig` -> `prepare` -> `
```java
import java.util.Date;
+@AutoService(BaseSparkSource.class)
public class Hello extends SparkBatchSource {
@Override
public Dataset<Row> getData(SparkEnvironment env) {
@@ -75,16 +77,22 @@ public class Hello extends SparkBatchSource {
}
}
```
-The `getPluginName` method is used to identify the plugin name.
-After you finish your implementation, you need to add a service provider to
the `META-INF/services` file.
-The file name should be `org.apache.seatunnel.spark.BaseSparkSource` or
`org.apache.seatunnel.spark.BaseSparkSink`, dependents on the plugin type.
-The content of the file should be the fully qualified class name of your
implementation.
+- The `getPluginName` method is used to identify the plugin name.
+- The `@AutoService` is used to generate the
`META-INF/services/org.apache.seatunnel.BaseSparkSource` file
+ automatically.
+
+Since this process cannot work on scala, if you use slala to implement your
plugin, you need to add a service provider
+to the `META-INF/services` file. The file name should be
`org.apache.seatunnel.spark.BaseSparkSource`
+or `org.apache.seatunnel.spark.BaseSparkSink`, dependents on the plugin type.
The content of the file should be the
+fully qualified class name of your implementation.
## Add plugin to the distribution
-You need to add your plugin to the `seatunnel-core-spark` module, then the
plugin will in distribution.
-```java
+You need to add your plugin to the `seatunnel-connectors-spark-dist`
module,then the plugin will in distribution.
+
+```xml
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connector-spark-hello</artifactId>
@@ -92,17 +100,20 @@ You need to add your plugin to the `seatunnel-core-spark`
module, then the plugi
</dependency>
```
+After you using `mvn package` to make a distribution, you can find the plugin
in your ${distribution}/connectors/spark.
+
# Contribute Flink Plugins
-The steps to contribute a Flink plugin is similar to the steps to contribute a
Spark plugin.
-Different from Spark, you need to add your plugin in Flink plugin modules.
+The steps to contribute a Flink plugin is similar to the steps to contribute a
Spark plugin. Different from Spark, you
+need to add your plugin in Flink plugin modules.
# Add e2e tests for your plugin
-Once you add a new plugin, it is recommended to add e2e tests for it. We have
a `seatunnel-e2e` module to help you to do this.
+Once you add a new plugin, it is recommended to add e2e tests for it. We have
a `seatunnel-e2e` module to help you to do
+this.
-For example, if you want to add an e2e test for your flink connector, you can
create a new test in `seatunnel-flink-e2e` module.
-And extend the FlinkContainer class in the test.
+For example, if you want to add an e2e test for your flink connector, you can
create a new test in `seatunnel-flink-e2e`
+module. And extend the FlinkContainer class in the test.
```java
public class HellpSourceIT extends FlinkContainer {
@@ -113,6 +124,7 @@ public class HellpSourceIT extends FlinkContainer {
Assert.assertEquals(0, execResult.getExitCode());
// do some other assertion here
}
+}
```
Once your class implements the `FlinkContainer` interface, it will auto create
a Flink cluster in Docker, and you just need to
diff --git a/pom.xml b/pom.xml
index 199fb63c..ad36d244 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
<elasticsearch>7</elasticsearch>
<slf4j.version>1.7.25</slf4j.version>
<guava.version>19.0</guava.version>
+ <auto-service.version>1.0.1</auto-service.version>
</properties>
<dependencyManagement>
@@ -606,6 +607,25 @@
</dependencies>
</dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
<build>
<finalName>${project.artifactId}-${project.version}-${scala.version}</finalName>
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index cd8db778..da0b169d 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -38,11 +38,6 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -53,10 +48,6 @@
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 620e921e..8966cc4b 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -32,6 +32,7 @@ import static
org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
@@ -41,6 +42,7 @@ import
org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
@@ -55,6 +57,7 @@ import java.util.List;
import java.util.Map;
@SuppressWarnings("magicnumber")
+@AutoService(BaseFlinkSink.class)
public class ClickhouseBatchSink implements FlinkBatchSink {
private ShardMetadata shardMetadata;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
index 573f3ec8..bbe98588 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.java
@@ -30,7 +30,9 @@ import static
org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
@@ -40,6 +42,7 @@ import
org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
@@ -61,7 +64,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class ClickhouseFileBatchSink extends ClickhouseBatchSink {
+@AutoService(BaseFlinkSink.class)
+public class ClickhouseFileBatchSink implements FlinkBatchSink {
private Config config;
private ShardMetadata shardMetadata;
@@ -178,7 +182,7 @@ public class ClickhouseFileBatchSink extends
ClickhouseBatchSink {
@Override
public void close() {
- super.close();
+ // do nothing
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index acd3d6a5..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.clickhouse.sink.ClickhouseBatchSink
-org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index a7394c12..bf178aa3 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -18,12 +18,14 @@
package org.apache.seatunnel.flink.console.sink;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.configuration.Configuration;
@@ -32,6 +34,7 @@ import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@AutoService(BaseFlinkSink.class)
public class ConsoleSink extends RichOutputFormat<Row> implements
FlinkBatchSink, FlinkStreamSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSink.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 8d3ae3fc..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.console.sink.ConsoleSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
index 32a078a1..07cfa6a0 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisSink.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.flink.doris.sink;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -39,6 +41,7 @@ import org.apache.flink.util.Preconditions;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+@AutoService(BaseFlinkSink.class)
public class DorisSink implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 4747849769146047770L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 6c5fe8a9..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.doris.sink.DorisSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
index 3dce7620..628ed549 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidSink.java
@@ -19,15 +19,18 @@ package org.apache.seatunnel.flink.druid.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.types.Row;
+@AutoService(BaseFlinkSink.class)
public class DruidSink implements FlinkBatchSink {
private static final long serialVersionUID = -2967782261362988646L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 26e9eda2..83478f57 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -29,11 +29,13 @@ import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INF
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -52,6 +54,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+@AutoService(BaseFlinkSource.class)
public class DruidSource implements FlinkBatchSource {
private static final long serialVersionUID = 8152628883440481281L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 1e1c9c1b..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.druid.sink.DruidSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 8d000cb7..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.druid.source.DruidSource
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
index bdc1bd7d..d97075cb 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/Elasticsearch6.java
@@ -29,6 +29,7 @@ import static
org.apache.seatunnel.flink.elasticsearch6.config.Config.PARALLELIS
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.StringTemplate;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -36,6 +37,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -52,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@AutoService(BaseFlinkSink.class)
public class Elasticsearch6 implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 8445868321245456793L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 3e5fe44d..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.elasticsearch6.sink.Elasticsearch6
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
index c31427f9..a8e55b72 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/Elasticsearch.java
@@ -27,6 +27,7 @@ import static
org.apache.seatunnel.flink.elasticsearch.config.Config.PARALLELISM
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.StringTemplate;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -50,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@AutoService(BaseFlinkSink.class)
public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink {
private static final long serialVersionUID = 8445868321245456793L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index e6d1cf22..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.elasticsearch.sink.Elasticsearch
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
index 2501d1fe..311b1bc7 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSource.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.flink.fake.source;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
@@ -31,6 +33,7 @@ import java.util.Arrays;
import java.util.Random;
import java.util.stream.Collectors;
+@AutoService(BaseFlinkSource.class)
public class FakeSource implements FlinkBatchSource {
private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky
Huo", "Kid Xiong"};
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
index c62a4677..2a3995d2 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/fake/source/FakeSourceStream.java
@@ -20,20 +20,23 @@ package org.apache.seatunnel.flink.fake.source;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
+@AutoService(BaseFlinkSource.class)
public class FakeSourceStream extends RichParallelSourceFunction<Row>
implements FlinkStreamSource {
private static final long serialVersionUID = -3026082767246767679L;
@@ -69,7 +72,7 @@ public class FakeSourceStream extends
RichParallelSourceFunction<Row> implements
private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky
Huo", "Kid Xiong"};
@Override
- public void run(SourceContext<Row> ctx) throws Exception {
+ public void run(SourceFunction.SourceContext<Row> ctx) throws Exception {
while (running) {
int randomNum = (int) (1 + Math.random() * NAME_ARRAY.length);
Row row = Row.of(NAME_ARRAY[randomNum - 1],
System.currentTimeMillis());
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 948e5ea6..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.fake.source.FakeSource
-org.apache.seatunnel.flink.fake.source.FakeSourceStream
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index 2735143b..b8d3400c 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.enums.FormatType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.DataSet;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
+@AutoService(BaseFlinkSink.class)
public class FileSink implements FlinkStreamSink, FlinkBatchSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSink.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index 02a88440..eafae0ae 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.file.source;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.enums.FormatType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
+import com.google.auto.service.AutoService;
import org.apache.avro.Schema;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -45,6 +47,7 @@ import org.apache.parquet.schema.MessageType;
import java.util.List;
import java.util.Map;
+@AutoService(BaseFlinkSource.class)
public class FileSource implements FlinkBatchSource {
private static final long serialVersionUID = -5206798549756998426L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 523a2304..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.file.sink.FileSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 2bf4b8a6..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.file.source.FileSource
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
index fc4d6068..c0dce094 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.flink.http.source;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.http.source.constant.Settings;
@@ -29,6 +30,7 @@ import
org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
@@ -43,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@AutoService(BaseFlinkSource.class)
public class Http implements FlinkBatchSource {
private static final String GET = "GET";
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index c884fff7..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.http.source.Http
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
index b983eccd..da27266b 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/sink/InfluxDbSink.java
@@ -19,17 +19,20 @@ package org.apache.seatunnel.flink.influxdb.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.types.Row;
import java.util.List;
+@AutoService(BaseFlinkSink.class)
public class InfluxDbSink implements FlinkBatchSink {
private static final long serialVersionUID = 7358988750295693096L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
index 73514c50..7368193b 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/java/org/apache/seatunnel/flink/influxdb/source/InfluxDbSource.java
@@ -26,11 +26,13 @@ import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INF
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,6 +44,7 @@ import org.apache.flink.types.Row;
import java.util.HashMap;
import java.util.List;
+@AutoService(BaseFlinkSource.class)
public class InfluxDbSource implements FlinkBatchSource {
private Config config;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 4e4d84c2..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.influxdb.sink.InfluxDbSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index ed8d57d5..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-influxdb/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.influxdb.source.InfluxDbSource
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 786efb75..1ccb3cb9 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -31,12 +31,14 @@ import static
org.apache.seatunnel.flink.jdbc.Config.USERNAME;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -58,6 +60,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
+@AutoService(BaseFlinkSink.class)
public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcSink.class);
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index 0eee8c08..e7433f79 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -35,6 +35,7 @@ import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.jdbc.input.DefaultTypeInformationMap;
@@ -46,6 +47,7 @@ import
org.apache.seatunnel.flink.jdbc.input.TypeInformationMap;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -67,6 +69,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+@AutoService(BaseFlinkSource.class)
public class JdbcSource implements FlinkBatchSource {
private static final long serialVersionUID = -3349505356339446415L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 6611df9e..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.jdbc.sink.JdbcSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index 614b84b4..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.jdbc.source.JdbcSource
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
index 2c1c6a43..40bc5262 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/sink/KafkaSink.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.flink.kafka.sink;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -32,6 +34,7 @@ import org.apache.flink.types.Row;
import java.util.Properties;
+@AutoService(BaseFlinkSink.class)
public class KafkaSink implements FlinkStreamSink {
private static final long serialVersionUID = 3980751499724935230L;
private static final String DEFAULT_KAFKA_SEMANTIC = "at_least_once";
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 1aacf290..69951f9f 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
+import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
+@AutoService(BaseFlinkSource.class)
public class KafkaTableStream implements FlinkStreamSource {
private static final long serialVersionUID = 5287018194573371428L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
deleted file mode 100644
index 81122604..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSink
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.kafka.sink.KafkaSink
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index dc101ab0..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.kafka.source.KafkaTableStream
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
index 35151430..4e9f96a0 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/java/org/apache/seatunnel/flink/socket/source/SocketStream.java
@@ -17,11 +17,13 @@
package org.apache.seatunnel.flink.socket.source;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
@@ -29,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
+@AutoService(BaseFlinkSource.class)
public class SocketStream implements FlinkStreamSource {
private static final long serialVersionUID = 986629276153771291L;
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
deleted file mode 100644
index b9099c0e..00000000
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-socket/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.seatunnel.flink.socket.source.SocketStream