This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 576609b [Pulsar-IO] Add Netty Tcp Source Support (#3179)
576609b is described below
commit 576609b01319f925a948c0754498edbdf880a86c
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Dec 16 01:35:43 2018 +0000
[Pulsar-IO] Add Netty Tcp Source Support (#3179)
### Motivation
Netty is NIO client server framework by supporting asynchronous
event-driven communication and custom protocol implementation.
Ref: https://netty.io/
This PR proposes Pulsar-IO Netty Source Connector by aiming the Tcp
clients. It enables an embedded Tcp Server to listen incoming Tcp messages and
writes them to user-defined Pulsar topic.
There are also other potential use-cases(Tcp, Http and Udp messages) for
this module as follows:
- Tcp Client (Pulsar-IO Sink): It can listen Pulsar messages and can write
to remote Tcp Server.
- Http Server and Client (Pulsar-IO Source and Sink)
- Udp Server and Client (Pulsar-IO Source and Sink)
This is following PR with #3095. Module has been rolled to Pulsar-IO in the
light of the previous discussion.
### Modifications
1- `NettyTcpServer`: Initializes an embedded Tcp Server to listen incoming
Tcp Requests
2- `NettyTcpServerHandler`: Inbound Channel Handler to handle incoming Tcp
Requests
3- `NettyChannelInitializer`: Channel Initializer to support different
types of decoders and handlers
4- `NettyTcpSource`: A push-based Source to listen Tcp messages and write
them to user-defined Pulsar topic
5- `NettyTcpSourceConfig`: To support user-defined config for both Map and
Yaml.
6- UT Coverages
---
pulsar-io/netty/pom.xml | 86 ++++++++++++++
.../org/apache/pulsar/io/netty/NettyTcpSource.java | 77 +++++++++++++
.../pulsar/io/netty/NettyTcpSourceConfig.java | 74 +++++++++++++
.../netty/tcp/server/NettyChannelInitializer.java | 43 +++++++
.../pulsar/io/netty/tcp/server/NettyTcpServer.java | 123 +++++++++++++++++++++
.../io/netty/tcp/server/NettyTcpServerHandler.java | 62 +++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 ++++
.../pulsar/io/netty/NettyTcpSourceConfigTest.java | 82 ++++++++++++++
.../tcp/server/NettyChannelInitializerTest.java | 45 ++++++++
.../io/netty/tcp/server/NettyTcpServerTest.java | 123 +++++++++++++++++++++
.../src/test/resources/nettyTcpSourceConfig.yaml | 24 ++++
.../nettyTcpSourceConfigWithInvalidProperty.yaml | 24 ++++
pulsar-io/pom.xml | 1 +
site2/docs/io-connectors.md | 1 +
site2/docs/io-tcp.md | 19 ++++
15 files changed, 806 insertions(+)
diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml
new file mode 100644
index 0000000..40dabc9
--- /dev/null
+++ b/pulsar-io/netty/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-netty</artifactId>
+ <name>Pulsar IO :: Netty</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
new file mode 100644
index 0000000..3cef730
--- /dev/null
+++
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp Source connector to listen Tcp messages and write to
user-defined Pulsar topic
+ */
+@Connector(
+ name = "tcp",
+ type = IOType.SOURCE,
+ help = "A simple Netty Tcp Source connector to listen Tcp messages and
write to user-defined Pulsar topic",
+ configClass = NettyTcpSourceConfig.class)
+public class NettyTcpSource extends PushSource<byte[]> {
+
+ private NettyTcpServer nettyTcpServer;
+ private Thread thread;
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
+ NettyTcpSourceConfig nettyTcpSourceConfig =
NettyTcpSourceConfig.load(config);
+
+ thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig,
this));
+ thread.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ nettyTcpServer.shutdownGracefully();
+ }
+
+ private class PulsarTcpServerRunnable implements Runnable {
+
+ private NettyTcpSourceConfig nettyTcpSourceConfig;
+ private NettyTcpSource nettyTcpSource;
+
+ public PulsarTcpServerRunnable(NettyTcpSourceConfig
nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
+ this.nettyTcpSourceConfig = nettyTcpSourceConfig;
+ this.nettyTcpSource = nettyTcpSource;
+ }
+
+ @Override
+ public void run() {
+ nettyTcpServer = new NettyTcpServer.Builder()
+ .setHost(nettyTcpSourceConfig.getHost())
+ .setPort(nettyTcpSourceConfig.getPort())
+ .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
+ .setNettyTcpSource(nettyTcpSource)
+ .build();
+
+ nettyTcpServer.run();
+ }
+ }
+
+}
diff --git
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
new file mode 100644
index 0000000..07b3cf8
--- /dev/null
+++
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Netty Tcp Source Connector Config.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class NettyTcpSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = -7116130435021510496L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "127.0.0.1",
+ help = "The host name or address that the source instance to
listen on")
+ private String host = "127.0.0.1";
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "10999",
+ help = "The port that the source instance to listen on")
+ private int port = 10999;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "1",
+ help = "The number of threads of Netty Tcp Server to accept
incoming connections and " +
+ "handle the traffic of the accepted connections")
+ private int numberOfThreads = 1;
+
+ public static NettyTcpSourceConfig load(Map<String, Object> map) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map),
NettyTcpSourceConfig.class);
+ }
+
+ public static NettyTcpSourceConfig load(String yamlFile) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile),
NettyTcpSourceConfig.class);
+ }
+
+}
diff --git
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
new file mode 100644
index 0000000..f88f514
--- /dev/null
+++
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.bytes.ByteArrayDecoder;
+
+/**
+ * Netty Channel Initializer to register decoder and handler
+ */
+public class NettyChannelInitializer extends ChannelInitializer<SocketChannel>
{
+
+ private ChannelInboundHandlerAdapter handler;
+
+ public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(new ByteArrayDecoder());
+ socketChannel.pipeline().addLast(this.handler);
+ }
+
+}
diff --git
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
new file mode 100644
index 0000000..b471b03
--- /dev/null
+++
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp Server to accept any incoming data through Tcp.
+ */
+public class NettyTcpServer {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NettyTcpServer.class);
+
+ private String host;
+ private int port;
+ private NettyTcpSource nettyTcpSource;
+ private int numberOfThreads;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ private NettyTcpServer(Builder builder) {
+ this.host = builder.host;
+ this.port = builder.port;
+ this.nettyTcpSource = builder.nettyTcpSource;
+ this.numberOfThreads = builder.numberOfThreads;
+ }
+
+ public void run() {
+ try {
+ bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+ workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new NettyChannelInitializer(new
NettyTcpServerHandler(this.nettyTcpSource)))
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ ChannelFuture channelFuture = serverBootstrap.bind(this.host,
this.port).sync();
+ channelFuture.channel().closeFuture().sync();
+ } catch(Exception ex) {
+ logger.error("Error occurred when Netty Tcp Server is running",
ex);
+ } finally {
+ shutdownGracefully();
+ }
+ }
+
+ public void shutdownGracefully() {
+ if (workerGroup != null)
+ workerGroup.shutdownGracefully();
+ if (bossGroup != null)
+ bossGroup.shutdownGracefully();
+ }
+
+ /**
+ * Pulsar Tcp Server Builder.
+ */
+ public static class Builder {
+
+ private String host;
+ private int port;
+ private NettyTcpSource nettyTcpSource;
+ private int numberOfThreads;
+
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
+ this.nettyTcpSource = nettyTcpSource;
+ return this;
+ }
+
+ public Builder setNumberOfThreads(int numberOfThreads) {
+ this.numberOfThreads = numberOfThreads;
+ return this;
+ }
+
+ public NettyTcpServer build() {
+ Preconditions.checkArgument(StringUtils.isNotBlank(host), "host
cannot be blank/null");
+ Preconditions.checkArgument(this.port >= 1024, "port must be set
equal or bigger than 1024");
+ Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource
must be set");
+ Preconditions.checkArgument(this.numberOfThreads > 0,
+ "numberOfThreads must be set as positive");
+
+ return new NettyTcpServer(this);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
new file mode 100644
index 0000000..2fd2cf3
--- /dev/null
+++
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.*;
+import lombok.Data;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Handles a server-side channel
+ */
[email protected]
+public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]>
{
+
+ private static final Logger logger =
LoggerFactory.getLogger(NettyTcpServerHandler.class);
+
+ private NettyTcpSource nettyTcpSource;
+
+ public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
+ this.nettyTcpSource = nettyTcpSource;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext,
byte[] bytes) throws Exception {
+ nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""),
bytes));
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.error("Error when processing incoming data", cause);
+ ctx.close();
+ }
+
+ @Data
+ static private class NettyTcpRecord implements Record<byte[]>,
Serializable {
+ private final Optional<String> key;
+ private final byte[] value;
+ }
+
+}
diff --git
a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..58744ce
--- /dev/null
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: tcp
+description: Netty Tcp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
diff --git
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
new file mode 100644
index 0000000..efc763a
--- /dev/null
+++
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Source Config
+ */
+public class NettyTcpSourceConfigTest {
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ @Test
+ public void testNettyTcpConfigLoadWithMap() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", LOCALHOST);
+ map.put("port", 10999);
+ map.put("numberOfThreads", 1);
+
+ NettyTcpSourceConfig nettyTcpSourceConfig =
NettyTcpSourceConfig.load(map);
+ assertNotNull(nettyTcpSourceConfig);
+ assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+ assertEquals(10999, nettyTcpSourceConfig.getPort());
+ assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+ }
+
+ @Test(expected = UnrecognizedPropertyException.class)
+ public void testNettyTcpConfigLoadWithMapWhenInvalidPropertyIsSet() throws
IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("invalidProperty", 1);
+
+ NettyTcpSourceConfig.load(map);
+ }
+
+ @Test
+ public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
+ File yamlFile = getFile("nettyTcpSourceConfig.yaml");
+ NettyTcpSourceConfig nettyTcpSourceConfig =
NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(nettyTcpSourceConfig);
+ assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+ assertEquals(10911, nettyTcpSourceConfig.getPort());
+ assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+ }
+
+ @Test(expected = UnrecognizedPropertyException.class)
+ public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet()
throws IOException {
+ File yamlFile =
getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
+ NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+
+}
diff --git
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
new file mode 100644
index 0000000..a5243e6
--- /dev/null
+++
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Channel Initializer
+ */
+public class NettyChannelInitializerTest {
+
+ @Test
+ public void testChannelInitializer() throws Exception {
+ NioSocketChannel channel = new NioSocketChannel();
+
+ NettyChannelInitializer nettyChannelInitializer = new
NettyChannelInitializer(
+ new NettyTcpServerHandler(new NettyTcpSource()));
+ nettyChannelInitializer.initChannel(channel);
+
+ assertNotNull(channel.pipeline().toMap());
+ assertEquals(2, channel.pipeline().toMap().size());
+ }
+
+}
diff --git
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
new file mode 100644
index 0000000..0c2f56b
--- /dev/null
+++
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Server
+ */
+public class NettyTcpServerTest {
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ @Test
+ public void testNettyTcpServerConstructor() {
+ NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(10999)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+
+ assertNotNull(nettyTcpServer);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerConstructorWhenHostIsNotSet() {
+ new NettyTcpServer.Builder()
+ .setPort(10999)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerConstructorWhenPortIsNotSet() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(10999)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+
+ @Test(expected = NullPointerException.class)
+ public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(10999)
+ .setNumberOfThreads(2)
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerWhenHostIsSetAsBlank() {
+ new NettyTcpServer.Builder()
+ .setHost(" ")
+ .setPort(10999)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerWhenPortIsSetAsZero() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(0)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(1022)
+ .setNumberOfThreads(2)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
+ new NettyTcpServer.Builder()
+ .setHost(LOCALHOST)
+ .setPort(10999)
+ .setNumberOfThreads(0)
+ .setNettyTcpSource(new NettyTcpSource())
+ .build();
+ }
+
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
new file mode 100644
index 0000000..ca748cc
--- /dev/null
+++ b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"numberOfThreads": "5"
+}
\ No newline at end of file
diff --git
a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
new file mode 100644
index 0000000..8a2bb92
--- /dev/null
+++
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"invalidProperty": "5"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 28a7f12..a9ff107 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -47,6 +47,7 @@
<module>kafka-connect-adaptor</module>
<module>debezium</module>
<module>canal</module>
+ <module>netty</module>
</modules>
</project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 92a19dd..0250ebc 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,3 +17,4 @@ Pulsar Functions cluster.
- [RabbitMQ Source Connector](io-rabbitmq.md#source)
- [Twitter Firehose Source Connector](io-twitter.md)
- [CDC Source Connector based on Debezium](io-cdc.md)
+- [Netty Tcp Source Connector](io-tcp.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-tcp.md
new file mode 100644
index 0000000..8bf3a89
--- /dev/null
+++ b/site2/docs/io-tcp.md
@@ -0,0 +1,19 @@
+---
+id: io-tcp
+title: Netty Tcp Connector
+sidebar_label: Netty Tcp Connector
+---
+
+## Source
+
+The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client
and write them to user-defined Pulsar topic.
+Also, this connector is suggested to be used in a containerized (e.g. k8s)
deployment.
+Otherwise, if the connector is running in process or thread mode, the
instances may be conflicting on listening to ports.
+
+### Source Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `host` | `false` | `127.0.0.1` | The host name or address that the source
instance to listen on. |
+| `port` | `false` | `10999` | The port that the source instance to listen on.
|
+| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp
Server to accept incoming connections and handle the traffic of the accepted
connections. |