This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new af83a302cf [improve] socket options (#9517)
af83a302cf is described below

commit af83a302cff22f8793a0b187ed6d933fc843b39d
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 22:56:00 2025 +0800

    [improve] socket options (#9517)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 ...ConfigOptions.java => SocketCommonOptions.java} | 11 ++---
 .../config/{SinkConfig.java => SocketConfig.java}  | 18 +++----
 ...nkConfigOptions.java => SocketSinkOptions.java} |  9 +---
 ...ConfigOptions.java => SocketSourceOptions.java} | 19 +-------
 .../seatunnel/socket/sink/SocketClient.java        |  4 +-
 .../seatunnel/socket/sink/SocketSink.java          | 52 +++++---------------
 .../seatunnel/socket/sink/SocketSinkFactory.java   | 19 +++++---
 .../seatunnel/socket/sink/SocketSinkWriter.java    | 13 ++---
 .../seatunnel/socket/source/SocketSource.java      | 53 +++++++++------------
 .../socket/source/SocketSourceFactory.java         | 19 ++++++--
 .../socket/source/SocketSourceParameter.java       | 55 ----------------------
 .../socket/source/SocketSourceReader.java          |  5 +-
 13 files changed, 83 insertions(+), 196 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7c6d791f27..47cbd7decd 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,11 +195,9 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PulsarSinkOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
-        whiteList.add("SocketSinkOptions");
         whiteList.add("SelectDBSinkOptions");
         whiteList.add("TablestoreSinkOptions");
         whiteList.add("TableStoreDBSourceOptions");
-        whiteList.add("SocketSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
         whiteList.add("OracleIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
similarity index 77%
copy from 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
copy to 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
index 2de6578fd2..12e180ddc3 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketCommonOptions.java
@@ -20,18 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.socket.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class SocketSinkConfigOptions {
-    private static final int DEFAULT_MAX_RETRIES = 3;
+public class SocketCommonOptions {
+
+    public static final String identifier = "Socket";
 
     public static final Option<String> HOST =
             
Options.key("host").stringType().noDefaultValue().withDescription("socket 
host");
 
     public static final Option<Integer> PORT =
             
Options.key("port").intType().noDefaultValue().withDescription("socket port");
-
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .defaultValue(DEFAULT_MAX_RETRIES)
-                    .withDescription("default value is " + DEFAULT_MAX_RETRIES 
+ ", max retries");
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
similarity index 59%
rename from 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
rename to 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
index d41d06fba6..7a1ff52eb2 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketConfig.java
@@ -17,27 +17,21 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Data;
 
 import java.io.Serializable;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
 @Data
-public class SinkConfig implements Serializable {
+public class SocketConfig implements Serializable {
     private String host;
     private int port;
     private int maxNumRetries;
 
-    public SinkConfig(Config config) {
-        this.host = config.getString(HOST.key());
-        this.port = config.getInt(PORT.key());
-        if (config.hasPath(MAX_RETRIES.key())) {
-            this.maxNumRetries = config.getInt(MAX_RETRIES.key());
-        }
+    public SocketConfig(ReadonlyConfig config) {
+        this.host = config.get(SocketCommonOptions.HOST);
+        this.port = config.get(SocketCommonOptions.PORT);
+        this.maxNumRetries = config.get(SocketSinkOptions.MAX_RETRIES);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
similarity index 80%
copy from 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
copy to 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
index 2de6578fd2..5364afb436 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkOptions.java
@@ -20,14 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.socket.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class SocketSinkConfigOptions {
-    private static final int DEFAULT_MAX_RETRIES = 3;
-
-    public static final Option<String> HOST =
-            
Options.key("host").stringType().noDefaultValue().withDescription("socket 
host");
+public class SocketSinkOptions extends SocketCommonOptions {
 
-    public static final Option<Integer> PORT =
-            
Options.key("port").intType().noDefaultValue().withDescription("socket port");
+    private static final int DEFAULT_MAX_RETRIES = 3;
 
     public static final Option<Integer> MAX_RETRIES =
             Options.key("max_retries")
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
similarity index 53%
rename from 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
rename to 
seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
index 2de6578fd2..71823fea29 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSinkConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/config/SocketSourceOptions.java
@@ -17,21 +17,4 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class SocketSinkConfigOptions {
-    private static final int DEFAULT_MAX_RETRIES = 3;
-
-    public static final Option<String> HOST =
-            
Options.key("host").stringType().noDefaultValue().withDescription("socket 
host");
-
-    public static final Option<Integer> PORT =
-            
Options.key("port").intType().noDefaultValue().withDescription("socket port");
-
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .defaultValue(DEFAULT_MAX_RETRIES)
-                    .withDescription("default value is " + DEFAULT_MAX_RETRIES 
+ ", max retries");
-}
+public class SocketSourceOptions extends SocketCommonOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
index ef44e291c9..4d5b4029f3 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketClient.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.socket.sink;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
 
@@ -42,7 +42,7 @@ public class SocketClient {
     private volatile boolean isRunning = Boolean.TRUE;
     private static final int CONNECTION_RETRY_DELAY = 500;
 
-    public SocketClient(SinkConfig config, SerializationSchema 
serializationSchema) {
+    public SocketClient(SocketConfig config, SerializationSchema 
serializationSchema) {
         this.hostName = config.getHost();
         this.port = config.getPort();
         this.serializationSchema = serializationSchema;
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 87bff65a9a..88a05072d1 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -17,69 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkOptions;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
-@AutoService(SeaTunnelSink.class)
 public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-    private Config pluginConfig;
-    private SinkConfig sinkConfig;
-    private SeaTunnelRowType seaTunnelRowType;
 
-    @Override
-    public String getPluginName() {
-        return "Socket";
-    }
+    private final SocketConfig socketConfig;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
PORT.key(), HOST.key());
-        if (!result.isSuccess()) {
-            throw new SocketConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        sinkConfig = new SinkConfig(pluginConfig);
+    public SocketSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+        this.socketConfig = new SocketConfig(pluginConfig);
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return SocketSinkOptions.identifier;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
+        return new SocketSinkWriter(socketConfig, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
index 9f86c5e78e..6ea535923d 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkFactory.java
@@ -18,24 +18,31 @@
 package org.apache.seatunnel.connectors.seatunnel.socket.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.MAX_RETRIES;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
 @AutoService(Factory.class)
 public class SocketSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "Socket";
+        return SocketSinkOptions.identifier;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(HOST, 
PORT).optional(MAX_RETRIES).build();
+        return OptionRule.builder()
+                .required(SocketSinkOptions.HOST, SocketSinkOptions.PORT)
+                .optional(SocketSinkOptions.MAX_RETRIES)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new SocketSink(context.getOptions(), 
context.getCatalogTable());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
index c901abfc15..3e3cb13587 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSinkWriter.java
@@ -17,24 +17,21 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.sink;
 
-import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import java.io.IOException;
 
 public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
     private final SocketClient socketClient;
-    private final SerializationSchema serializationSchema;
-    private final SinkConfig sinkConfig;
 
-    SocketSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) 
throws IOException {
-        this.sinkConfig = sinkConfig;
-        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
-        this.socketClient = new SocketClient(sinkConfig, serializationSchema);
+    SocketSinkWriter(SocketConfig socketConfig, SeaTunnelRowType 
seaTunnelRowType)
+            throws IOException {
+        this.socketClient =
+                new SocketClient(socketConfig, new 
JsonSerializationSchema(seaTunnelRowType));
         socketClient.open();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index a115924cfb..99d66b0034 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -17,36 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.socket.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSourceOptions;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
+import java.util.Collections;
+import java.util.List;
 
-@AutoService(SeaTunnelSource.class)
 public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
-    private SocketSourceParameter parameter;
+    private final SocketConfig parameter;
+    private final CatalogTable catalogTable;
     private JobContext jobContext;
 
+    public SocketSource(ReadonlyConfig pluginConfig) {
+        this.parameter = new SocketConfig(pluginConfig);
+        SeaTunnelRowType seaTunnelRowType =
+                new SeaTunnelRowType(
+                        new String[] {"value"}, new SeaTunnelDataType<?>[] 
{BasicType.STRING_TYPE});
+        this.catalogTable =
+                
CatalogTableUtil.getCatalogTable(SocketSourceOptions.identifier, 
seaTunnelRowType);
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return JobMode.BATCH.equals(jobContext.getJobMode())
@@ -56,20 +59,7 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public String getPluginName() {
-        return "Socket";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
PORT.key(), HOST.key());
-        if (!result.isSuccess()) {
-            throw new SocketConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.parameter = new SocketSourceParameter(pluginConfig);
+        return SocketSourceOptions.identifier;
     }
 
     @Override
@@ -78,9 +68,8 @@ public class SocketSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return new SeaTunnelRowType(
-                new String[] {"value"}, new SeaTunnelDataType<?>[] 
{BasicType.STRING_TYPE});
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
index 11ecefa718..4bc6d38210 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceFactory.java
@@ -19,24 +19,35 @@ package 
org.apache.seatunnel.connectors.seatunnel.socket.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSourceOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class SocketSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "Socket";
+        return SocketSourceOptions.identifier;
     }
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(HOST, PORT).build();
+        return OptionRule.builder()
+                .required(SocketSourceOptions.HOST, SocketSourceOptions.PORT)
+                .build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
SocketSource(context.getOptions());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
deleted file mode 100644
index b7ce011ae0..0000000000
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
-
-public class SocketSourceParameter implements Serializable {
-    private final String host;
-    private final Integer port;
-
-    public String getHost() {
-        return StringUtils.isBlank(host) ? HOST.defaultValue() : host;
-    }
-
-    public Integer getPort() {
-        return Objects.isNull(port) ? PORT.defaultValue() : port;
-    }
-
-    public SocketSourceParameter(Config config) {
-        if (config.hasPath(HOST.key())) {
-            this.host = config.getString(HOST.key());
-        } else {
-            this.host = HOST.defaultValue();
-        }
-
-        if (config.hasPath(PORT.key())) {
-            this.port = config.getInt(PORT.key());
-        } else {
-            this.port = PORT.defaultValue();
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index 903f948d8e..70b4326f1c 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -34,12 +35,12 @@ import java.net.Socket;
 @Slf4j
 public class SocketSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     private static final int CHAR_BUFFER_SIZE = 8192;
-    private final SocketSourceParameter parameter;
+    private final SocketConfig parameter;
     private final SingleSplitReaderContext context;
     private Socket socket;
     private final String delimiter = "\n";
 
-    SocketSourceReader(SocketSourceParameter parameter, 
SingleSplitReaderContext context) {
+    SocketSourceReader(SocketConfig parameter, SingleSplitReaderContext 
context) {
         this.parameter = parameter;
         this.context = context;
     }

Reply via email to