This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7ec205a Added HTTP Support (#3336) 7ec205a is described below commit 7ec205ad5733bbbc9d99da16ed4721221bb5b315 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Fri Jan 11 13:27:36 2019 -0800 Added HTTP Support (#3336) * Added HTTP Support * Updated documentation and removed duplicate code * Added unit test for NettyHttpChannelInitializer --- .../org/apache/pulsar/io/netty/NettySource.java | 7 +- .../apache/pulsar/io/netty/NettySourceConfig.java | 19 ++- .../NettyHttpChannelInitializer.java} | 19 ++- .../io/netty/http/NettyHttpServerHandler.java | 144 +++++++++++++++++++++ .../package-info.java} | 26 +--- ...tyChannelInitializer.java => package-info.java} | 26 +--- .../io/netty/server/NettyChannelInitializer.java | 2 +- .../apache/pulsar/io/netty/server/NettyServer.java | 70 ++++++---- .../pulsar/io/netty/server/NettyServerHandler.java | 13 +- ...tyChannelInitializer.java => package-info.java} | 26 +--- .../http/NettyHttpChannelInitializerTest.java} | 32 ++--- site2/docs/io-connectors.md | 2 +- site2/docs/io-netty.md | 4 +- 13 files changed, 250 insertions(+), 140 deletions(-) diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java index 215bd34..1e799d8 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java @@ -18,20 +18,21 @@ */ package org.apache.pulsar.io.netty; +import java.util.Map; + import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; import org.apache.pulsar.io.netty.server.NettyServer; -import java.util.Map; /** - * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic + * A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic. */ @Connector( name = "netty", type = IOType.SOURCE, - help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic", + help = "A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic", configClass = NettySourceConfig.class) public class NettySource extends PushSource<byte[]> { diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java index f5d40e9..1ef4c35 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java @@ -20,17 +20,22 @@ package org.apache.pulsar.io.netty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.*; -import lombok.experimental.Accessors; -import org.apache.pulsar.io.core.annotations.FieldDoc; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; +import org.apache.pulsar.io.core.annotations.FieldDoc; + /** - * Netty Tcp or Udp Source Connector Config. + * Netty Source Connector Config. */ @Data @Setter @@ -45,7 +50,7 @@ public class NettySourceConfig implements Serializable { @FieldDoc( required = true, defaultValue = "tcp", - help = "The tcp or udp network protocols") + help = "The network protocol to use, supported values are 'tcp', 'udp', and 'http'") private String type = "tcp"; @FieldDoc( @@ -63,8 +68,8 @@ public class NettySourceConfig implements Serializable { @FieldDoc( required = true, defaultValue = "1", - help = "The number of threads of Netty Tcp Server to accept incoming connections and " + - "handle the traffic of the accepted connections") + help = "The number of threads of Netty Tcp Server to accept incoming connections and " + + "handle the traffic of the accepted connections") private int numberOfThreads = 1; public static NettySourceConfig load(Map<String, Object> map) throws IOException { diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java similarity index 65% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java index b5fa820..dce26fb 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java @@ -16,28 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.http; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.ssl.SslContext; /** - * Netty Channel Initializer to register decoder and handler + * Netty Channel Initializer to register HTTP decoder and handler. */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { +public class NettyHttpChannelInitializer extends ChannelInitializer<SocketChannel> { + private final SslContext sslCtx; private ChannelInboundHandlerAdapter handler; - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { + public NettyHttpChannelInitializer(ChannelInboundHandlerAdapter handler, SslContext sslCtx) { this.handler = handler; + this.sslCtx = sslCtx; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); + if (sslCtx != null) { + socketChannel.pipeline().addLast(sslCtx.newHandler(socketChannel.alloc())); + } + socketChannel.pipeline().addLast(new HttpServerCodec()); socketChannel.pipeline().addLast(this.handler); } - } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java new file mode 100644 index 0000000..ef4cf8c --- /dev/null +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.netty.http; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.CharsetUtil; + +import java.io.Serializable; +import java.util.Optional; + +import lombok.Data; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.netty.NettySource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles a server-side HTTP channel. + */ +@ChannelHandler.Sharable +public class NettyHttpServerHandler extends SimpleChannelInboundHandler<Object> { + + private static final Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class); + + private NettySource nettySource; + + public NettyHttpServerHandler(NettySource nettySource) { + this.nettySource = nettySource; + } + + private HttpRequest request; + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + + if (msg instanceof HttpRequest) { + HttpRequest request = this.request = (HttpRequest) msg; + + if (HttpUtil.is100ContinueExpected(request)) { + send100Continue(ctx); + } + } + + if (msg instanceof HttpContent) { + HttpContent httpContent = (HttpContent) msg; + + ByteBuf content = httpContent.content(); + if (content.isReadable()) { + nettySource.consume(new NettyHttpRecord(Optional.ofNullable(""), + content.toString(CharsetUtil.UTF_8).getBytes())); + } + + if (msg instanceof LastHttpContent) { + LastHttpContent trailer = (LastHttpContent) msg; + + if (!writeResponse(trailer, ctx)) { + // If keep-alive is off, close the connection once the content is fully written. + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + } + } + + private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) { + // Decide whether to close the connection or not. + boolean keepAlive = HttpUtil.isKeepAlive(request); + // Build the response object. + FullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + currentObj.decoderResult().isSuccess() ? HttpResponseStatus.OK : HttpResponseStatus.BAD_REQUEST, + Unpooled.EMPTY_BUFFER); + + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); + + if (keepAlive) { + // Add 'Content-Length' header only for a keep-alive connection. + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + // Add keep alive header as per: + // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + + // Write the response. + ctx.write(response); + + return keepAlive; + } + + private static void send100Continue(ChannelHandlerContext ctx) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); + ctx.write(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("Error when processing incoming data", cause); + ctx.close(); + } + + @Data + static private class NettyHttpRecord implements Record<byte[]>, Serializable { + private final Optional<String> key; + private final byte[] value; + } +} diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java index b5fa820..2e71305 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java @@ -16,28 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} +package org.apache.pulsar.io.netty.http; \ No newline at end of file diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java index b5fa820..482e05a 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java @@ -16,28 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} +package org.apache.pulsar.io.netty; \ No newline at end of file diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java index b5fa820..b9a7b4c 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java @@ -24,7 +24,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; /** - * Netty Channel Initializer to register decoder and handler + * Netty Channel Initializer to register decoder and handler. */ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java index 2cb8031..775b6f4 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.io.netty.server; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -29,11 +32,13 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.io.netty.NettySource; +import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer; +import org.apache.pulsar.io.netty.http.NettyHttpServerHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Netty Tcp or Udp Server to accept any incoming data through Tcp. + * Netty Server to accept incoming data via the configured type. */ public class NettyServer { @@ -58,28 +63,32 @@ public class NettyServer { public void run() { try { switch (type) { - case TCP: - runTcp(); - break; case UDP: runUdp(); break; + case HTTP: + runHttp(); + break; + case TCP: default: runTcp(); break; } - } catch(Exception ex) { - logger.error("Error occurred when Netty Tcp or Udp Server is running", ex); + } catch (Exception ex) { + logger.error("Error occurred when Netty Server is running", ex); } finally { shutdownGracefully(); } } public void shutdownGracefully() { - if (workerGroup != null) + if (workerGroup != null) { workerGroup.shutdownGracefully(); - if (bossGroup != null) + } + + if (bossGroup != null) { bossGroup.shutdownGracefully(); + } } private void runUdp() throws InterruptedException { @@ -95,17 +104,32 @@ public class NettyServer { } private void runTcp() throws InterruptedException { + ServerBootstrap serverBootstrap = getServerBootstrap( + new NettyChannelInitializer(new NettyServerHandler(this.nettySource))); + + ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); + channelFuture.channel().closeFuture().sync(); + } + + private void runHttp() throws InterruptedException { + ServerBootstrap serverBootstrap = getServerBootstrap( + new NettyHttpChannelInitializer(new NettyHttpServerHandler(this.nettySource), null)); + + ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); + channelFuture.channel().closeFuture().sync(); + } + + private ServerBootstrap getServerBootstrap(ChannelHandler childHandler) { bossGroup = new NioEventLoopGroup(this.numberOfThreads); workerGroup = new NioEventLoopGroup(this.numberOfThreads); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); - serverBootstrap.childHandler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource))) - .option(ChannelOption.SO_BACKLOG, 1024) - .childOption(ChannelOption.SO_KEEPALIVE, true); + serverBootstrap.childHandler(childHandler) + .option(ChannelOption.SO_BACKLOG, 1024) + .childOption(ChannelOption.SO_KEEPALIVE, true); - ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); - channelFuture.channel().closeFuture().sync(); + return serverBootstrap; } /** @@ -145,11 +169,11 @@ public class NettyServer { } public NettyServer build() { - Preconditions.checkNotNull(this.type, "type cannot be blank/null"); - Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null"); - Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024"); - Preconditions.checkNotNull(this.nettySource, "nettySource must be set"); - Preconditions.checkArgument(this.numberOfThreads > 0, + checkNotNull(this.type, "type cannot be blank/null"); + checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null"); + checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024"); + checkNotNull(this.nettySource, "nettySource must be set"); + checkArgument(this.numberOfThreads > 0, "numberOfThreads must be set as positive"); return new NettyServer(this); @@ -157,13 +181,11 @@ public class NettyServer { } /** - * tcp or udp network protocol + * Network protocol. */ public enum Type { - TCP, - - UDP + UDP, + HTTP } - } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java index 81fd203..42f497e 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java @@ -18,18 +18,21 @@ */ package org.apache.pulsar.io.netty.server; -import io.netty.channel.*; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.io.Serializable; +import java.util.Optional; + import lombok.Data; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.netty.NettySource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.Optional; - /** - * Handles a server-side channel + * Handles a server-side channel. */ @ChannelHandler.Sharable public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> { diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java index b5fa820..5a1ab94 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java @@ -16,28 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} +package org.apache.pulsar.io.netty.server; \ No newline at end of file diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java index b5fa820..eae087b 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java @@ -16,28 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.http; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.pulsar.io.netty.NettySource; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; /** - * Netty Channel Initializer to register decoder and handler + * Tests for Netty Channel Initializer */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { +public class NettyHttpChannelInitializerTest { - private ChannelInboundHandlerAdapter handler; + @Test + public void testChannelInitializer() throws Exception { + NioSocketChannel channel = new NioSocketChannel(); - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } + NettyHttpChannelInitializer nettyChannelInitializer = new NettyHttpChannelInitializer( + new NettyHttpServerHandler(new NettySource()), null); + nettyChannelInitializer.initChannel(channel); - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); + assertNotNull(channel.pipeline().toMap()); + assertEquals(2, channel.pipeline().toMap().size()); } } diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md index 05a43dc..c81ee98 100644 --- a/site2/docs/io-connectors.md +++ b/site2/docs/io-connectors.md @@ -17,4 +17,4 @@ Pulsar Functions cluster. - [RabbitMQ Source Connector](io-rabbitmq.md#source) - [Twitter Firehose Source Connector](io-twitter.md) - [CDC Source Connector based on Debezium](io-cdc.md) -- [Netty Tcp or Udp Source Connector](io-netty.md#source) +- [Netty Source Connector](io-netty.md#source) diff --git a/site2/docs/io-netty.md b/site2/docs/io-netty.md index 479752f..4b3d16b 100644 --- a/site2/docs/io-netty.md +++ b/site2/docs/io-netty.md @@ -6,7 +6,7 @@ sidebar_label: Netty Tcp or Udp Connector ## Source -The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from Tcp/Udp Client and write them to user-defined Pulsar topic. +The Netty Source connector opens a port that accept incoming data via the configured network protocol and publish it to a user-defined Pulsar topic. Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment. Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports. @@ -14,7 +14,7 @@ Otherwise, if the connector is running in process or thread mode, the instances | Name | Required | Default | Description | |------|----------|---------|-------------| -| `type` | `false` | `tcp` | The tcp or udp network protocol required by netty. | +| `type` | `false` | `tcp` | The network protocol over which data is trasmitted to netty. Valid values include HTTP, TCP, and UDP | | `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. | | `port` | `false` | `10999` | The port that the source instance to listen on. | | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |