Repository: camel Updated Branches: refs/heads/master d1a1f7944 -> 04fa8fe52
CAMEL-10276: Update camel-syslog to use Netty4 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04fa8fe5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04fa8fe5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04fa8fe5 Branch: refs/heads/master Commit: 04fa8fe522f814763e30e0ce7deaf9bbfd743ab7 Parents: d1a1f79 Author: James Netherton <[email protected]> Authored: Wed Aug 31 14:18:06 2016 +0100 Committer: Andrea Cosentino <[email protected]> Committed: Wed Aug 31 15:49:30 2016 +0200 ---------------------------------------------------------------------- components/camel-syslog/pom.xml | 2 +- .../component/syslog/netty/Rfc5425Encoder.java | 35 +++++++++--------- .../syslog/netty/Rfc5425FrameDecoder.java | 39 +++++++++++--------- .../component/syslog/NettyDataFormatTest.java | 4 +- .../syslog/NettyManyUDPMessagesTest.java | 2 +- .../syslog/NettyRfc5425LongMessageTest.java | 9 +++-- .../component/syslog/NettyRfc5425Test.java | 4 +- .../syslog/applicationContext-Netty.xml | 2 +- .../features/src/main/resources/features.xml | 2 +- 9 files changed, 53 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-syslog/pom.xml b/components/camel-syslog/pom.xml index 7d1ac77..2401642 100644 --- a/components/camel-syslog/pom.xml +++ b/components/camel-syslog/pom.xml @@ -42,7 +42,7 @@ <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-netty</artifactId> + <artifactId>camel-netty4</artifactId> </dependency> <!-- test dependencies --> http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425Encoder.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425Encoder.java b/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425Encoder.java index 38eb574..01c79af 100644 --- a/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425Encoder.java +++ b/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425Encoder.java @@ -17,32 +17,33 @@ package org.apache.camel.component.syslog.netty; import java.nio.charset.Charset; +import java.util.List; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandler.Sharable; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; -import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; @Sharable -public class Rfc5425Encoder extends OneToOneEncoder { +public class Rfc5425Encoder extends MessageToMessageEncoder<ByteBuf> { @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } + protected void encode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { + if (byteBuf.isReadable()) { + int length = byteBuf.readableBytes(); - ChannelBuffer src = (ChannelBuffer) msg; - int length = src.readableBytes(); + String headerString = length + " "; - String headerString = length + " "; + ByteBuf header = ByteBufAllocator.DEFAULT.buffer(headerString.getBytes(Charset.forName("UTF8")).length); + header.writeBytes(headerString.getBytes(Charset.forName("UTF8"))); - ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(src.order(), headerString.getBytes(Charset.forName("UTF8")).length); - header.writeBytes(headerString.getBytes(Charset.forName("UTF8"))); + Unpooled.buffer(); - return wrappedBuffer(header, src); + byteBuf.retain(); + out.add(Unpooled.wrappedBuffer(header, byteBuf)); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425FrameDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425FrameDecoder.java b/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425FrameDecoder.java index 15b2b5a..e384530 100644 --- a/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425FrameDecoder.java +++ b/components/camel-syslog/src/main/java/org/apache/camel/component/syslog/netty/Rfc5425FrameDecoder.java @@ -16,28 +16,33 @@ */ package org.apache.camel.component.syslog.netty; -import org.apache.camel.component.netty.ChannelHandlerFactory; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.frame.FrameDecoder; +import java.util.List; -public class Rfc5425FrameDecoder extends FrameDecoder implements ChannelHandlerFactory { +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import org.apache.camel.component.netty4.ChannelHandlerFactory; + +public class Rfc5425FrameDecoder extends ByteToMessageDecoder implements ChannelHandlerFactory { private Integer currentFramelength; @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (currentFramelength == null) { // find index of the first space, it should be after the length field - int index = indexOf(buffer, ChannelBuffers.wrappedBuffer(new byte[]{' '})); + int index = indexOf(in, Unpooled.wrappedBuffer(new byte[]{' '})); // Read part until the first space, if we have found one StringBuffer lengthbuffer = new StringBuffer(); if (index > -1) { - lengthbuffer.append(new String(buffer.readBytes(index).array())); + ByteBuf byteBuf = in.readBytes(index); + byte[] dest = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(dest); + lengthbuffer.append(new String(dest)); } int length; @@ -52,21 +57,21 @@ public class Rfc5425FrameDecoder extends FrameDecoder implements ChannelHandlerF // We have not found the length field, reset the buffer so we can // retry next time if (length < 0) { - buffer.resetReaderIndex(); - return null; + in.resetReaderIndex(); + return; } currentFramelength = length; } // Buffer does not contain enough data yet, wait until it does - if (buffer.readableBytes() < currentFramelength) { - return null; + if (in.readableBytes() < currentFramelength) { + return; } // read the message int lengthToRead = currentFramelength; currentFramelength = null; - return buffer.readBytes(lengthToRead); + out.add(in.readBytes(lengthToRead)); } /** @@ -74,7 +79,7 @@ public class Rfc5425FrameDecoder extends FrameDecoder implements ChannelHandlerF * between the readerIndex of the haystack and the first needle found in the * haystack. -1 is returned if no needle is found in the haystack. */ - private static int indexOf(ChannelBuffer haystack, ChannelBuffer needle) { + private static int indexOf(ByteBuf haystack, ByteBuf needle) { for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) { int haystackIndex = i; int needleIndex; http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyDataFormatTest.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyDataFormatTest.java b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyDataFormatTest.java index 0f18568..c621024 100644 --- a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyDataFormatTest.java +++ b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyDataFormatTest.java @@ -79,7 +79,7 @@ public class NettyDataFormatTest extends CamelTestSupport { mock2.expectedMessageCount(1); mock2.expectedBodiesReceived(message); - template.sendBody("netty:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false&useChannelBuffer=true", message); + template.sendBody("netty4:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false&useByteBuf=true", message); assertMockEndpointsSatisfied(); } @@ -93,7 +93,7 @@ public class NettyDataFormatTest extends CamelTestSupport { DataFormat syslogDataFormat = new SyslogDataFormat(); // we setup a Syslog listener on a random port. - from("netty:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false").unmarshal(syslogDataFormat) + from("netty4:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false").unmarshal(syslogDataFormat) .process(new Processor() { public void process(Exchange ex) { assertTrue(ex.getIn().getBody() instanceof SyslogMessage); http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyManyUDPMessagesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyManyUDPMessagesTest.java b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyManyUDPMessagesTest.java index 9598d29..4194ede 100644 --- a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyManyUDPMessagesTest.java +++ b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyManyUDPMessagesTest.java @@ -76,7 +76,7 @@ public class NettyManyUDPMessagesTest extends CamelTestSupport { DataFormat syslogDataFormat = new SyslogDataFormat(); // we setup a Syslog listener on a random port. - from("netty:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false").unmarshal(syslogDataFormat) + from("netty4:udp://127.0.0.1:" + serverPort + "?sync=false&allowDefaultCodec=false").unmarshal(syslogDataFormat) .process(new Processor() { public void process(Exchange ex) { assertTrue(ex.getIn().getBody() instanceof SyslogMessage); http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425LongMessageTest.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425LongMessageTest.java b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425LongMessageTest.java index 0b7166c..7325e0d 100644 --- a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425LongMessageTest.java +++ b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425LongMessageTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.syslog; +import io.netty.buffer.ByteBuf; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; @@ -26,7 +28,6 @@ import org.apache.camel.impl.JndiRegistry; import org.apache.camel.spi.DataFormat; import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.jboss.netty.buffer.ChannelBuffer; import org.junit.BeforeClass; import org.junit.Test; @@ -50,7 +51,7 @@ public class NettyRfc5425LongMessageTest extends CamelTestSupport { @BeforeClass public static void initPort() { serverPort = AvailablePortFinder.getNextAvailable(); - uri = "netty:tcp://localhost:" + serverPort + "?sync=false&allowDefaultCodec=false&decoders=#decoder&encoder=#encoder"; + uri = "netty4:tcp://localhost:" + serverPort + "?sync=false&allowDefaultCodec=false&decoders=#decoder&encoder=#encoder"; } @Override @@ -92,8 +93,8 @@ public class NettyRfc5425LongMessageTest extends CamelTestSupport { assertTrue(ex.getIn().getBody() instanceof SyslogMessage); } }).to("mock:syslogReceiver").marshal(syslogDataFormat).to("mock:syslogReceiver2"); - // Here we need to turn the request body into channelbuffer - from("direct:start").convertBodyTo(ChannelBuffer.class).to(uri); + // Here we need to turn the request body into ByteBuf + from("direct:start").convertBodyTo(ByteBuf.class).to(uri); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425Test.java ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425Test.java b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425Test.java index a2bb2ca..2815827 100644 --- a/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425Test.java +++ b/components/camel-syslog/src/test/java/org/apache/camel/component/syslog/NettyRfc5425Test.java @@ -43,8 +43,8 @@ public class NettyRfc5425Test extends CamelTestSupport { @BeforeClass public static void initPort() { serverPort = AvailablePortFinder.getNextAvailable(); - uri = "netty:tcp://localhost:" + serverPort + "?sync=false&allowDefaultCodec=false&decoders=#decoder&encoder=#encoder"; - uriClient = uri + "&useChannelBuffer=true"; + uri = "netty4:tcp://localhost:" + serverPort + "?sync=false&allowDefaultCodec=false&decoders=#decoder&encoder=#encoder"; + uriClient = uri + "&useByteBuf=true"; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/components/camel-syslog/src/test/resources/org/apache/camel/component/syslog/applicationContext-Netty.xml ---------------------------------------------------------------------- diff --git a/components/camel-syslog/src/test/resources/org/apache/camel/component/syslog/applicationContext-Netty.xml b/components/camel-syslog/src/test/resources/org/apache/camel/component/syslog/applicationContext-Netty.xml index 9b3a624..f3ccedd 100644 --- a/components/camel-syslog/src/test/resources/org/apache/camel/component/syslog/applicationContext-Netty.xml +++ b/components/camel-syslog/src/test/resources/org/apache/camel/component/syslog/applicationContext-Netty.xml @@ -31,7 +31,7 @@ </dataFormats> <route> - <from uri="netty:udp://localhost:{{server-port}}?sync=false&allowDefaultCodec=false"/> + <from uri="netty4:udp://localhost:{{server-port}}?sync=false&allowDefaultCodec=false"/> <unmarshal ref="mySyslog"/> <to uri="mock:stop1"/> <marshal ref="mySyslog"/> http://git-wip-us.apache.org/repos/asf/camel/blob/04fa8fe5/platforms/karaf/features/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml index 08826ef..51e1d60 100644 --- a/platforms/karaf/features/src/main/resources/features.xml +++ b/platforms/karaf/features/src/main/resources/features.xml @@ -1753,7 +1753,7 @@ </feature> <feature name='camel-syslog' version='${project.version}' resolver='(obr)' start-level='50'> <feature version='${project.version}'>camel-core</feature> - <feature version='${project.version}'>camel-netty</feature> + <feature version='${project.version}'>camel-netty4</feature> <bundle>mvn:org.apache.camel/camel-syslog/${project.version}</bundle> </feature> <feature name='camel-tagsoup' version='${project.version}' resolver='(obr)' start-level='50'>
