This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new b6ed4ce5 [api-draft][connector] new socket source  (#1999)
b6ed4ce5 is described below

commit b6ed4ce5b95420d5aae47a7708da68e36a7aa63a
Author: Kerwin <[email protected]>
AuthorDate: Mon Jun 13 11:11:37 2022 +0800

    [api-draft][connector] new socket source  (#1999)
    
    * add new socket source connector
    
    * fix code style.
    
    * add license header
    
    * fix socket source test exception
    
    * code cleanup in the socket source
---
 .../apache/seatunnel/api/source/SourceReader.java  |   2 +-
 seatunnel-connectors/plugin-mapping.properties     |   1 +
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   5 +
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../pom.xml                                        |  20 ++--
 .../seatunnel/socket/source/SocketSource.java      |  86 ++++++++++++++++
 .../socket/source/SocketSourceParameter.java       |  47 +++++++++
 .../socket/source/SocketSourceReader.java          | 110 +++++++++++++++++++++
 .../seatunnel/socket/source/SocketSourceSplit.java |  33 +++++++
 .../socket/source/SocketSourceSplitEnumerator.java |  77 +++++++++++++++
 .../seatunnel/socket/state/SocketState.java        |  23 +++++
 .../seatunnel-flink-new-connector-example/pom.xml  |   5 +
 12 files changed, 400 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index f9810e97..b8505902 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -33,7 +33,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> 
extends AutoCloseab
     /**
      * Open the source reader.
      */
-    void open();
+    void open() throws Exception;
 
     /**
      * Called to close the reader, in case it holds on to any resources, like 
threads or network
diff --git a/seatunnel-connectors/plugin-mapping.properties 
b/seatunnel-connectors/plugin-mapping.properties
index e9cd9b86..e92ddbaa 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -89,4 +89,5 @@ seatunnel.source.FakeSource = 
seatunnel-connector-seatunnel-fake
 seatunnel.sink.Console = seatunnel-connector-seatunnel-console
 seatunnel.source.Kafka = seatunnel-connector-seatunnel-kafka
 seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
+seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
 seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index e50ae874..42a03364 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>seatunnel-connector-seatunnel-hive</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 9254ba5f..78e60052 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,5 +35,6 @@
         <module>seatunnel-connector-seatunnel-console</module>
         <module>seatunnel-connector-seatunnel-fake</module>
         <module>seatunnel-connector-seatunnel-kafka</module>
+        <module>seatunnel-connector-seatunnel-socket</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
similarity index 74%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to 
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
index 9254ba5f..447067c7 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
@@ -21,19 +21,21 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-connectors</artifactId>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-connectors-seatunnel</artifactId>
+    <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
 
-    <modules>
-        <module>seatunnel-connector-seatunnel-hive</module>
-        <module>seatunnel-connector-seatunnel-console</module>
-        <module>seatunnel-connector-seatunnel-fake</module>
-        <module>seatunnel-connector-seatunnel-kafka</module>
-    </modules>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
new file mode 100644
index 00000000..d01dcfcd
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class SocketSource implements SeaTunnelSource<SeaTunnelRow, 
SocketSourceSplit, SocketState> {
+    private SocketSourceParameter parameter;
+    private SeaTunnelContext seaTunnelContext;
+    @Override
+    public String getPluginName() {
+        return "Socket";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.parameter = ConfigBeanFactory.create(pluginConfig, 
SocketSourceParameter.class);
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return this.seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public SeaTunnelRowTypeInfo getRowTypeInfo() {
+        return new SeaTunnelRowTypeInfo(new String[]{"value"}, new 
SeaTunnelDataType<?>[]{BasicType.STRING});
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, SocketSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new SocketSourceReader(this.parameter, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<SocketSourceSplit, SocketState> 
createEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> 
enumeratorContext) throws Exception {
+        return new SocketSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<SocketSourceSplit, SocketState> 
restoreEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> 
enumeratorContext, SocketState checkpointState) throws Exception {
+        return null;
+    }
+
+    @Override
+    public Serializer<SocketState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
new file mode 100644
index 00000000..dbbc7eca
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class SocketSourceParameter implements Serializable {
+
+    private static final String DEFAULT_HOST = "localhost";
+    private static final int DEFAULT_PORT = 9999;
+    private String host;
+    private Integer port;
+
+    public String getHost() {
+        return StringUtils.isBlank(host) ? DEFAULT_HOST : host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public Integer getPort() {
+        return Objects.isNull(port) ? DEFAULT_PORT : port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
new file mode 100644
index 00000000..5431b55a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.List;
+
+public class SocketSourceReader implements SourceReader<SeaTunnelRow, 
SocketSourceSplit> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SocketSourceReader.class);
+    private static final int CHAR_BUFFER_SIZE = 8192;
+    private final SocketSourceParameter parameter;
+    private final SourceReader.Context context;
+    private Socket socket;
+    private String delimiter = "\n";
+    SocketSourceReader(SocketSourceParameter parameter, SourceReader.Context 
context) {
+        this.parameter = parameter;
+        this.context = context;
+    }
+
+    @Override
+    public void open() throws Exception {
+        socket = new Socket();
+        LOGGER.info("connect socket server, host:[{}], port:[{}] ", 
this.parameter.getHost(), this.parameter.getPort());
+        socket.connect(new InetSocketAddress(this.parameter.getHost(), 
this.parameter.getPort()), 0);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (socket != null) {
+            socket.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        StringBuilder buffer = new StringBuilder();
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()))) {
+            char[] buf = new char[CHAR_BUFFER_SIZE];
+            int bytesRead;
+            while ((bytesRead = reader.read(buf)) != -1) {
+                buffer.append(buf, 0, bytesRead);
+
+                int delimPos;
+                while (buffer.length() >= this.delimiter.length() && (delimPos 
= buffer.indexOf(this.delimiter)) != -1) {
+                    String record = buffer.substring(0, delimPos);
+                    if (this.delimiter.equals("\n") && record.endsWith("\r")) {
+                        record = record.substring(0, record.length() - 1);
+                    }
+                    output.collect(new SeaTunnelRow(new Object[]{record}));
+                    buffer.delete(0, delimPos + this.delimiter.length());
+                }
+                if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                    // signal to the source that we have reached the end of 
the data.
+                    context.signalNoMoreElement();
+                    break;
+                }
+            }
+        }
+        if (buffer.length() > 0) {
+            output.collect(new SeaTunnelRow(new Object[]{buffer.toString()}));
+        }
+    }
+
+    @Override
+    public List<SocketSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return null;
+    }
+
+    @Override
+    public void addSplits(List<SocketSourceSplit> splits) {
+
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
new file mode 100644
index 00000000..cfdd2cda
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class SocketSourceSplit implements SourceSplit {
+    private final String splitId;
+
+    public SocketSourceSplit(String splitId) {
+        this.splitId = splitId;
+    }
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
new file mode 100644
index 00000000..0d789e2b
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SocketSourceSplitEnumerator implements 
SourceSplitEnumerator<SocketSourceSplit, SocketState> {
+
+    private final SourceSplitEnumerator.Context<SocketSourceSplit> 
enumeratorContext;
+
+    public 
SocketSourceSplitEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> 
enumeratorContext) {
+        this.enumeratorContext = enumeratorContext;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<SocketSourceSplit> splits, int subtaskId) {
+
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+
+    }
+
+    @Override
+    public SocketState snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
new file mode 100644
index 00000000..f5d58219
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.state;
+
+import java.io.Serializable;
+
+public class SocketState implements Serializable {
+}
diff --git a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml 
b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
index c4f1338a..a1466827 100644
--- a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>seatunnel-connector-seatunnel-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->

Reply via email to