This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 576609b  [Pulsar-IO] Add Netty Tcp Source Support (#3179)
576609b is described below

commit 576609b01319f925a948c0754498edbdf880a86c
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Dec 16 01:35:43 2018 +0000

    [Pulsar-IO] Add Netty Tcp Source Support (#3179)
    
    ### Motivation
    Netty is NIO client server framework by supporting asynchronous 
event-driven communication and custom protocol implementation.
    Ref: https://netty.io/
    
    This PR proposes Pulsar-IO Netty Source Connector by aiming the Tcp 
clients. It enables an embedded Tcp Server to listen incoming Tcp messages and 
writes them to user-defined Pulsar topic.
    
    There are also other potential use-cases(Tcp, Http and Udp messages) for 
this module as follows:
    - Tcp Client (Pulsar-IO Sink): It can listen Pulsar messages and can write 
to remote Tcp Server.
    - Http Server and Client (Pulsar-IO Source and Sink)
    - Udp Server and Client (Pulsar-IO Source and Sink)
    
    This is following PR with #3095. Module has been rolled to Pulsar-IO in the 
light of the previous discussion.
    
    ### Modifications
    1- `NettyTcpServer`: Initializes an embedded Tcp Server to listen incoming 
Tcp Requests
    2- `NettyTcpServerHandler`: Inbound Channel Handler to handle incoming Tcp 
Requests
    3- `NettyChannelInitializer`: Channel Initializer to support different 
types of decoders and handlers
    4- `NettyTcpSource`: A push-based Source to listen Tcp messages and write 
them to user-defined Pulsar topic
    5- `NettyTcpSourceConfig`: To support user-defined config for both Map and 
Yaml.
    6- UT Coverages
---
 pulsar-io/netty/pom.xml                            |  86 ++++++++++++++
 .../org/apache/pulsar/io/netty/NettyTcpSource.java |  77 +++++++++++++
 .../pulsar/io/netty/NettyTcpSourceConfig.java      |  74 +++++++++++++
 .../netty/tcp/server/NettyChannelInitializer.java  |  43 +++++++
 .../pulsar/io/netty/tcp/server/NettyTcpServer.java | 123 +++++++++++++++++++++
 .../io/netty/tcp/server/NettyTcpServerHandler.java |  62 +++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 ++++
 .../pulsar/io/netty/NettyTcpSourceConfigTest.java  |  82 ++++++++++++++
 .../tcp/server/NettyChannelInitializerTest.java    |  45 ++++++++
 .../io/netty/tcp/server/NettyTcpServerTest.java    | 123 +++++++++++++++++++++
 .../src/test/resources/nettyTcpSourceConfig.yaml   |  24 ++++
 .../nettyTcpSourceConfigWithInvalidProperty.yaml   |  24 ++++
 pulsar-io/pom.xml                                  |   1 +
 site2/docs/io-connectors.md                        |   1 +
 site2/docs/io-tcp.md                               |  19 ++++
 15 files changed, 806 insertions(+)

diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml
new file mode 100644
index 0000000..40dabc9
--- /dev/null
+++ b/pulsar-io/netty/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+        xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-io</artifactId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-io-netty</artifactId>
+    <name>Pulsar IO :: Netty</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
new file mode 100644
index 0000000..3cef730
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp Source connector to listen Tcp messages and write to 
user-defined Pulsar topic
+ */
+@Connector(
+        name = "tcp",
+        type = IOType.SOURCE,
+        help = "A simple Netty Tcp Source connector to listen Tcp messages and 
write to user-defined Pulsar topic",
+        configClass = NettyTcpSourceConfig.class)
+public class NettyTcpSource extends PushSource<byte[]> {
+
+    private NettyTcpServer nettyTcpServer;
+    private Thread thread;
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+        NettyTcpSourceConfig nettyTcpSourceConfig = 
NettyTcpSourceConfig.load(config);
+
+        thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, 
this));
+        thread.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        nettyTcpServer.shutdownGracefully();
+    }
+
+    private class PulsarTcpServerRunnable implements Runnable {
+
+        private NettyTcpSourceConfig nettyTcpSourceConfig;
+        private NettyTcpSource nettyTcpSource;
+
+        public PulsarTcpServerRunnable(NettyTcpSourceConfig 
nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
+            this.nettyTcpSourceConfig = nettyTcpSourceConfig;
+            this.nettyTcpSource = nettyTcpSource;
+        }
+
+        @Override
+        public void run() {
+            nettyTcpServer = new NettyTcpServer.Builder()
+                .setHost(nettyTcpSourceConfig.getHost())
+                .setPort(nettyTcpSourceConfig.getPort())
+                .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
+                .setNettyTcpSource(nettyTcpSource)
+                .build();
+
+            nettyTcpServer.run();
+        }
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
new file mode 100644
index 0000000..07b3cf8
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Netty Tcp Source Connector Config.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class NettyTcpSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = -7116130435021510496L;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "127.0.0.1",
+            help = "The host name or address that the source instance to 
listen on")
+    private String host = "127.0.0.1";
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "10999",
+            help = "The port that the source instance to listen on")
+    private int port = 10999;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "1",
+            help = "The number of threads of Netty Tcp Server to accept 
incoming connections and " +
+                    "handle the traffic of the accepted connections")
+    private int numberOfThreads = 1;
+
+    public static NettyTcpSourceConfig load(Map<String, Object> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
NettyTcpSourceConfig.class);
+    }
+
+    public static NettyTcpSourceConfig load(String yamlFile) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), 
NettyTcpSourceConfig.class);
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
new file mode 100644
index 0000000..f88f514
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.bytes.ByteArrayDecoder;
+
+/**
+ * Netty Channel Initializer to register decoder and handler
+ */
+public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
+
+    private ChannelInboundHandlerAdapter handler;
+
+    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        socketChannel.pipeline().addLast(new ByteArrayDecoder());
+        socketChannel.pipeline().addLast(this.handler);
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
new file mode 100644
index 0000000..b471b03
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp Server to accept any incoming data through Tcp.
+ */
+public class NettyTcpServer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyTcpServer.class);
+
+    private String host;
+    private int port;
+    private NettyTcpSource nettyTcpSource;
+    private int numberOfThreads;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    private NettyTcpServer(Builder builder) {
+        this.host = builder.host;
+        this.port = builder.port;
+        this.nettyTcpSource = builder.nettyTcpSource;
+        this.numberOfThreads = builder.numberOfThreads;
+    }
+
+    public void run() {
+        try {
+            bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+            workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+
+            ServerBootstrap serverBootstrap = new ServerBootstrap();
+            serverBootstrap.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new NettyChannelInitializer(new 
NettyTcpServerHandler(this.nettyTcpSource)))
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+            ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
+            channelFuture.channel().closeFuture().sync();
+        } catch(Exception ex) {
+            logger.error("Error occurred when Netty Tcp Server is running", 
ex);
+        } finally {
+            shutdownGracefully();
+        }
+    }
+
+    public void shutdownGracefully() {
+        if (workerGroup != null)
+            workerGroup.shutdownGracefully();
+        if (bossGroup != null)
+            bossGroup.shutdownGracefully();
+    }
+
+    /**
+     * Pulsar Tcp Server Builder.
+     */
+    public static class Builder {
+
+        private String host;
+        private int port;
+        private NettyTcpSource nettyTcpSource;
+        private int numberOfThreads;
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
+            this.nettyTcpSource = nettyTcpSource;
+            return this;
+        }
+
+        public Builder setNumberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        public NettyTcpServer build() {
+            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host 
cannot be blank/null");
+            Preconditions.checkArgument(this.port >= 1024, "port must be set 
equal or bigger than 1024");
+            Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource 
must be set");
+            Preconditions.checkArgument(this.numberOfThreads > 0,
+                    "numberOfThreads must be set as positive");
+
+            return new NettyTcpServer(this);
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
new file mode 100644
index 0000000..2fd2cf3
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.*;
+import lombok.Data;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Handles a server-side channel
+ */
[email protected]
+public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> 
{
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyTcpServerHandler.class);
+
+    private NettyTcpSource nettyTcpSource;
+
+    public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
+        this.nettyTcpSource = nettyTcpSource;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, 
byte[] bytes) throws Exception {
+        nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), 
bytes));
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        logger.error("Error when processing incoming data", cause);
+        ctx.close();
+    }
+
+    @Data
+    static private class NettyTcpRecord implements Record<byte[]>, 
Serializable {
+        private final Optional<String> key;
+        private final byte[] value;
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..58744ce
--- /dev/null
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: tcp
+description: Netty Tcp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
diff --git 
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
new file mode 100644
index 0000000..efc763a
--- /dev/null
+++ 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Source Config
+ */
+public class NettyTcpSourceConfigTest {
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    @Test
+    public void testNettyTcpConfigLoadWithMap() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", LOCALHOST);
+        map.put("port", 10999);
+        map.put("numberOfThreads", 1);
+
+        NettyTcpSourceConfig nettyTcpSourceConfig = 
NettyTcpSourceConfig.load(map);
+        assertNotNull(nettyTcpSourceConfig);
+        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+        assertEquals(10999, nettyTcpSourceConfig.getPort());
+        assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+    }
+
+    @Test(expected = UnrecognizedPropertyException.class)
+    public void testNettyTcpConfigLoadWithMapWhenInvalidPropertyIsSet() throws 
IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("invalidProperty", 1);
+
+        NettyTcpSourceConfig.load(map);
+    }
+
+    @Test
+    public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
+        File yamlFile = getFile("nettyTcpSourceConfig.yaml");
+        NettyTcpSourceConfig nettyTcpSourceConfig = 
NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(nettyTcpSourceConfig);
+        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+        assertEquals(10911, nettyTcpSourceConfig.getPort());
+        assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+    }
+
+    @Test(expected = UnrecognizedPropertyException.class)
+    public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() 
throws IOException {
+        File yamlFile = 
getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
+        NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
new file mode 100644
index 0000000..a5243e6
--- /dev/null
+++ 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Channel Initializer
+ */
+public class NettyChannelInitializerTest {
+
+    @Test
+    public void testChannelInitializer() throws Exception {
+        NioSocketChannel channel = new NioSocketChannel();
+
+        NettyChannelInitializer nettyChannelInitializer = new 
NettyChannelInitializer(
+                new NettyTcpServerHandler(new NettyTcpSource()));
+        nettyChannelInitializer.initChannel(channel);
+
+        assertNotNull(channel.pipeline().toMap());
+        assertEquals(2, channel.pipeline().toMap().size());
+    }
+
+}
diff --git 
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
new file mode 100644
index 0000000..0c2f56b
--- /dev/null
+++ 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Server
+ */
+public class NettyTcpServerTest {
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    @Test
+    public void testNettyTcpServerConstructor() {
+        NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+
+        assertNotNull(nettyTcpServer);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenHostIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenPortIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+
+    @Test(expected = NullPointerException.class)
+    public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenHostIsSetAsBlank() {
+        new NettyTcpServer.Builder()
+                .setHost(" ")
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenPortIsSetAsZero() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(0)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(1022)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(0)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml 
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
new file mode 100644
index 0000000..ca748cc
--- /dev/null
+++ b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"numberOfThreads": "5"
+}
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
 
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
new file mode 100644
index 0000000..8a2bb92
--- /dev/null
+++ 
b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"invalidProperty": "5"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 28a7f12..a9ff107 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -47,6 +47,7 @@
     <module>kafka-connect-adaptor</module>
     <module>debezium</module>
     <module>canal</module>
+    <module>netty</module>
   </modules>
 
 </project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 92a19dd..0250ebc 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,3 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
+- [Netty Tcp Source Connector](io-tcp.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-tcp.md
new file mode 100644
index 0000000..8bf3a89
--- /dev/null
+++ b/site2/docs/io-tcp.md
@@ -0,0 +1,19 @@
+---
+id: io-tcp
+title: Netty Tcp Connector
+sidebar_label: Netty Tcp Connector
+---
+
+## Source
+
+The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client 
and write them to user-defined Pulsar topic.
+Also, this connector is suggested to be used in a containerized (e.g. k8s) 
deployment.
+Otherwise, if the connector is running in process or thread mode, the 
instances may be conflicting on listening to ports.
+
+### Source Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `host` | `false` | `127.0.0.1` | The host name or address that the source 
instance to listen on. |
+| `port` | `false` | `10999` | The port that the source instance to listen on. 
|
+| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp 
Server to accept incoming connections and handle the traffic of the accepted 
connections. |

Reply via email to