This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b6ed4ce5 [api-draft][connector] new socket source (#1999)
b6ed4ce5 is described below
commit b6ed4ce5b95420d5aae47a7708da68e36a7aa63a
Author: Kerwin <[email protected]>
AuthorDate: Mon Jun 13 11:11:37 2022 +0800
[api-draft][connector] new socket source (#1999)
* add new socket source connector
* fix code style.
* add license header
* fix socket source test exception
* code cleanup in the socket source
---
.../apache/seatunnel/api/source/SourceReader.java | 2 +-
seatunnel-connectors/plugin-mapping.properties | 1 +
.../seatunnel-connectors-seatunnel-dist/pom.xml | 5 +
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../pom.xml | 20 ++--
.../seatunnel/socket/source/SocketSource.java | 86 ++++++++++++++++
.../socket/source/SocketSourceParameter.java | 47 +++++++++
.../socket/source/SocketSourceReader.java | 110 +++++++++++++++++++++
.../seatunnel/socket/source/SocketSourceSplit.java | 33 +++++++
.../socket/source/SocketSourceSplitEnumerator.java | 77 +++++++++++++++
.../seatunnel/socket/state/SocketState.java | 23 +++++
.../seatunnel-flink-new-connector-example/pom.xml | 5 +
12 files changed, 400 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index f9810e97..b8505902 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -33,7 +33,7 @@ public interface SourceReader<T, SplitT extends SourceSplit>
extends AutoCloseab
/**
* Open the source reader.
*/
- void open();
+ void open() throws Exception;
/**
* Called to close the reader, in case it holds on to any resources, like
threads or network
diff --git a/seatunnel-connectors/plugin-mapping.properties
b/seatunnel-connectors/plugin-mapping.properties
index e9cd9b86..e92ddbaa 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -89,4 +89,5 @@ seatunnel.source.FakeSource =
seatunnel-connector-seatunnel-fake
seatunnel.sink.Console = seatunnel-connector-seatunnel-console
seatunnel.source.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
+seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index e50ae874..42a03364 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -50,6 +50,11 @@
<artifactId>seatunnel-connector-seatunnel-hive</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 9254ba5f..78e60052 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,5 +35,6 @@
<module>seatunnel-connector-seatunnel-console</module>
<module>seatunnel-connector-seatunnel-fake</module>
<module>seatunnel-connector-seatunnel-kafka</module>
+ <module>seatunnel-connector-seatunnel-socket</module>
</modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
similarity index 74%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
index 9254ba5f..447067c7 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
@@ -21,19 +21,21 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-seatunnel</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
- <modules>
- <module>seatunnel-connector-seatunnel-hive</module>
- <module>seatunnel-connector-seatunnel-console</module>
- <module>seatunnel-connector-seatunnel-fake</module>
- <module>seatunnel-connector-seatunnel-kafka</module>
- </modules>
</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
new file mode 100644
index 00000000..d01dcfcd
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class SocketSource implements SeaTunnelSource<SeaTunnelRow,
SocketSourceSplit, SocketState> {
+ private SocketSourceParameter parameter;
+ private SeaTunnelContext seaTunnelContext;
+ @Override
+ public String getPluginName() {
+ return "Socket";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.parameter = ConfigBeanFactory.create(pluginConfig,
SocketSourceParameter.class);
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return this.seaTunnelContext;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public SeaTunnelRowTypeInfo getRowTypeInfo() {
+ return new SeaTunnelRowTypeInfo(new String[]{"value"}, new
SeaTunnelDataType<?>[]{BasicType.STRING});
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, SocketSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new SocketSourceReader(this.parameter, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<SocketSourceSplit, SocketState>
createEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit>
enumeratorContext) throws Exception {
+ return new SocketSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<SocketSourceSplit, SocketState>
restoreEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit>
enumeratorContext, SocketState checkpointState) throws Exception {
+ return null;
+ }
+
+ @Override
+ public Serializer<SocketState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
new file mode 100644
index 00000000..dbbc7eca
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceParameter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class SocketSourceParameter implements Serializable {
+
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_PORT = 9999;
+ private String host;
+ private Integer port;
+
+ public String getHost() {
+ return StringUtils.isBlank(host) ? DEFAULT_HOST : host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return Objects.isNull(port) ? DEFAULT_PORT : port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
new file mode 100644
index 00000000..5431b55a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.List;
+
+public class SocketSourceReader implements SourceReader<SeaTunnelRow,
SocketSourceSplit> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SocketSourceReader.class);
+ private static final int CHAR_BUFFER_SIZE = 8192;
+ private final SocketSourceParameter parameter;
+ private final SourceReader.Context context;
+ private Socket socket;
+ private String delimiter = "\n";
+ SocketSourceReader(SocketSourceParameter parameter, SourceReader.Context
context) {
+ this.parameter = parameter;
+ this.context = context;
+ }
+
+ @Override
+ public void open() throws Exception {
+ socket = new Socket();
+ LOGGER.info("connect socket server, host:[{}], port:[{}] ",
this.parameter.getHost(), this.parameter.getPort());
+ socket.connect(new InetSocketAddress(this.parameter.getHost(),
this.parameter.getPort()), 0);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (socket != null) {
+ socket.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ StringBuilder buffer = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()))) {
+ char[] buf = new char[CHAR_BUFFER_SIZE];
+ int bytesRead;
+ while ((bytesRead = reader.read(buf)) != -1) {
+ buffer.append(buf, 0, bytesRead);
+
+ int delimPos;
+ while (buffer.length() >= this.delimiter.length() && (delimPos
= buffer.indexOf(this.delimiter)) != -1) {
+ String record = buffer.substring(0, delimPos);
+ if (this.delimiter.equals("\n") && record.endsWith("\r")) {
+ record = record.substring(0, record.length() - 1);
+ }
+ output.collect(new SeaTunnelRow(new Object[]{record}));
+ buffer.delete(0, delimPos + this.delimiter.length());
+ }
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of
the data.
+ context.signalNoMoreElement();
+ break;
+ }
+ }
+ }
+ if (buffer.length() > 0) {
+ output.collect(new SeaTunnelRow(new Object[]{buffer.toString()}));
+ }
+ }
+
+ @Override
+ public List<SocketSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<SocketSourceSplit> splits) {
+
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
new file mode 100644
index 00000000..cfdd2cda
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class SocketSourceSplit implements SourceSplit {
+ private final String splitId;
+
+ public SocketSourceSplit(String splitId) {
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
new file mode 100644
index 00000000..0d789e2b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SocketSourceSplitEnumerator implements
SourceSplitEnumerator<SocketSourceSplit, SocketState> {
+
+ private final SourceSplitEnumerator.Context<SocketSourceSplit>
enumeratorContext;
+
+ public
SocketSourceSplitEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit>
enumeratorContext) {
+ this.enumeratorContext = enumeratorContext;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<SocketSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+
+ }
+
+ @Override
+ public SocketState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
new file mode 100644
index 00000000..f5d58219
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/state/SocketState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.socket.state;
+
+import java.io.Serializable;
+
+public class SocketState implements Serializable {
+}
diff --git a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
index c4f1338a..a1466827 100644
--- a/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml
@@ -56,6 +56,11 @@
<artifactId>seatunnel-connector-seatunnel-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->